提交 af407cb5 编写于 作者: H Haojun Liao

[td-225] fix bugs in case of ts disorder in mem/imem and data in file

上级 9376be5f
......@@ -30,10 +30,10 @@ extern "C" {
#include "tsqlfunction.h"
#include "tutil.h"
#include "qExecutor.h"
#include "qsqlparser.h"
#include "qsqltype.h"
#include "qtsbuf.h"
#include "queryExecutor.h"
// forward declaration
struct SSqlInfo;
......
......@@ -32,7 +32,7 @@ extern "C" {
#define TSKEY int64_t
#endif
#define TSWINDOW_INITIALIZER {INT64_MIN, INT64_MAX};
#define TSWINDOW_INITIALIZER ((STimeWindow) {INT64_MIN, INT64_MAX})
#define TSKEY_INITIAL_VAL INT64_MIN
// ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR
......
......@@ -167,12 +167,6 @@ typedef struct {
SArray *pGroupList;
} STableGroupInfo;
typedef struct {
} SFields;
#define TSDB_TS_GREATER_EQUAL 1
#define TSDB_TS_LESS_EQUAL 2
typedef struct SQueryRowCond {
int32_t rel;
TSKEY ts;
......
......@@ -32,7 +32,7 @@ typedef struct SLoserTreeNode {
typedef struct SLoserTreeInfo {
int32_t numOfEntries;
int32_t totalEntries;
__merge_compare_fn_t comparaFn;
__merge_compare_fn_t comparFn;
void * param;
SLoserTreeNode *pNode;
......
......@@ -17,18 +17,18 @@
#include "hash.h"
#include "hashfunc.h"
#include "qExecutor.h"
#include "qUtil.h"
#include "qast.h"
#include "qresultBuf.h"
#include "query.h"
#include "queryExecutor.h"
#include "queryLog.h"
#include "queryUtil.h"
#include "taosmsg.h"
#include "tdataformat.h"
#include "tlosertree.h"
#include "tscUtil.h" // todo move the function to common module
#include "tscompression.h"
#include "ttime.h"
#include "tscUtil.h" // todo move the function to common module
#include "tdataformat.h"
#define DEFAULT_INTERN_BUF_SIZE 16384L
......
......@@ -16,10 +16,10 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "qExecutor.h"
#include "taosmsg.h"
#include "tsqlfunction.h"
#include "queryExecutor.h"
#include "tcompare.h"
#include "tsqlfunction.h"
bool less_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) {
return (*(int8_t *)minval < pFilter->filterInfo.upperBndi);
......
......@@ -23,8 +23,8 @@
#include "qinterpolation.h"
#include "ttime.h"
#include "queryExecutor.h"
#include "queryUtil.h"
#include "qExecutor.h"
#include "qUtil.h"
int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type) {
......
......@@ -476,44 +476,6 @@ typedef struct {
SEndPoint* end;
} SQueryCond;
//static void setInitialValueForRangeQueryCondition(tSKipListQueryCond *q, int8_t type) {
// q->lowerBndRelOptr = TSDB_RELATION_GREATER;
// q->upperBndRelOptr = TSDB_RELATION_LESS;
//
// switch (type) {
// case TSDB_DATA_TYPE_BOOL:
// case TSDB_DATA_TYPE_TINYINT:
// case TSDB_DATA_TYPE_SMALLINT:
// case TSDB_DATA_TYPE_INT:
// case TSDB_DATA_TYPE_BIGINT: {
// q->upperBnd.nType = TSDB_DATA_TYPE_BIGINT;
// q->lowerBnd.nType = TSDB_DATA_TYPE_BIGINT;
//
// q->upperBnd.i64Key = INT64_MAX;
// q->lowerBnd.i64Key = INT64_MIN;
// break;
// };
// case TSDB_DATA_TYPE_FLOAT:
// case TSDB_DATA_TYPE_DOUBLE: {
// q->upperBnd.nType = TSDB_DATA_TYPE_DOUBLE;
// q->lowerBnd.nType = TSDB_DATA_TYPE_DOUBLE;
// q->upperBnd.dKey = DBL_MAX;
// q->lowerBnd.dKey = -DBL_MIN;
// break;
// };
// case TSDB_DATA_TYPE_NCHAR:
// case TSDB_DATA_TYPE_BINARY: {
// q->upperBnd.nType = type;
// q->upperBnd.pz = NULL;
// q->upperBnd.nLen = -1;
//
// q->lowerBnd.nType = type;
// q->lowerBnd.pz = NULL;
// q->lowerBnd.nLen = -1;
// }
// }
//}
// todo check for malloc failure
static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) {
int32_t optr = queryColInfo->optr;
......@@ -788,7 +750,6 @@ static void exprTreeTraverseImpl(tExprNode *pExpr, SArray *pResult, SExprTravers
taosArrayCopy(pResult, array);
}
static void tSQLBinaryTraverseOnSkipList(tExprNode *pExpr, SArray *pResult, SSkipList *pSkipList, SExprTraverseSupp *param ) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
......@@ -834,8 +795,6 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
tSkipListDestroyIter(iter);
}
// post-root order traverse syntax tree
void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) {
if (pExpr == NULL) {
......@@ -1100,7 +1059,6 @@ static char* exception_strdup(const char* str) {
return p;
}
static tExprNode* exprTreeFromBinaryImpl(SBufferReader* br) {
int32_t anchor = CLEANUP_GET_ANCHOR();
......
......@@ -185,6 +185,7 @@ int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
: pFillInfo->rowIdx + 1;
}
// todo: refactor
static double linearInterpolationImpl(double v1, double v2, double k1, double k2, double k) {
return v1 + (v2 - v1) * (k - k1) / (k2 - k1);
}
......@@ -449,14 +450,6 @@ int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numO
}
}
void taosFillInfoSetSource(SFillInfo* pFillInfo, tFilePage **data, TSKEY endKey) {
pFillInfo->endKey = endKey;
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
memcpy(pFillInfo->pData[i], data[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes);
}
}
void taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int64_t* outputRows, int32_t capacity) {
int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator?
......
......@@ -54,7 +54,7 @@ uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* pa
(*pTree)->numOfEntries = numOfEntries;
(*pTree)->totalEntries = totalEntries;
(*pTree)->param = param;
(*pTree)->comparaFn = compareFn;
(*pTree)->comparFn = compareFn;
// set initial value for loser tree
tLoserTreeInit(*pTree);
......@@ -95,7 +95,7 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) {
return;
}
int32_t ret = pTree->comparaFn(&pTree->pNode[parentId], &kLeaf, pTree->param);
int32_t ret = pTree->comparFn(&pTree->pNode[parentId], &kLeaf, pTree->param);
if (ret < 0) {
SLoserTreeNode t = pTree->pNode[parentId];
pTree->pNode[parentId] = kLeaf;
......
......@@ -49,6 +49,9 @@ typedef struct SQueryFilePos {
int32_t slot;
int32_t pos;
int64_t lastKey;
int32_t rows;
bool mixBlock;
STimeWindow win;
} SQueryFilePos;
typedef struct SDataBlockLoadInfo {
......@@ -61,7 +64,6 @@ typedef struct SDataBlockLoadInfo {
typedef struct SLoadCompBlockInfo {
int32_t tid; /* table tid */
int32_t fileId;
int32_t fileListIndex;
} SLoadCompBlockInfo;
typedef struct STableCheckInfo {
......@@ -71,10 +73,13 @@ typedef struct STableCheckInfo {
int32_t start;
SCompInfo* pCompInfo;
int32_t compSize;
int32_t numOfBlocks; // number of qualified data blocks not the original blocks
int32_t numOfBlocks; // number of qualified data blocks not the original blocks
SDataCols* pDataCols;
SSkipListIterator* iter;
SSkipListIterator* iter; // skip list iterator
SSkipListIterator* iiter; // imem iterator
bool hasObtainBuf; // if we should initialize the in-memory skip list iterator
} STableCheckInfo;
typedef struct {
......@@ -110,6 +115,7 @@ typedef struct STsdbQueryHandle {
SField** pFields;
SArray* pColumns; // column list, SColumnInfoData array list
bool locateStart;
int32_t outputCapacity;
int32_t realNumOfRows;
SArray* pTableCheckInfo; //SArray<STableCheckInfo>
int32_t activeIndex;
......@@ -134,7 +140,6 @@ static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
pCompBlockLoadInfo->tid = -1;
pCompBlockLoadInfo->fileId = -1;
pCompBlockLoadInfo->fileListIndex = -1;
}
TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList) {
......@@ -149,6 +154,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
pQueryHandle->cur.fid = -1;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
......@@ -186,15 +192,15 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
// allocate buffer in order to load data blocks from file
int32_t numOfCols = pCond->numOfCols;
size_t bufferCapacity = 4096;
pQueryHandle->outputCapacity = 4096;
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
SColumnInfoData pDest = {{0}, 0};
pDest.info = pCond->colList[i];
pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCond->colList[i].bytes);
taosArrayPush(pQueryHandle->pColumns, &pDest);
SColumnInfoData colInfo = {{0}, 0};
colInfo.info = pCond->colList[i];
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes);
taosArrayPush(pQueryHandle->pColumns, &colInfo);
}
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
......@@ -223,6 +229,72 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond*
return pQueryHandle;
}
static bool initSkipListIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) {
STable* pTable = pCheckInfo->pTableObj;
assert(pTable != NULL);
if (pCheckInfo->hasObtainBuf) {
return true;
}
pCheckInfo->hasObtainBuf = true;
int32_t order = pHandle->order;
// no data in buffer, abort
if (pTable->mem == NULL && pTable->imem == NULL) {
return false;
}
assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
if (pTable->mem) {
pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey,
TSDB_DATA_TYPE_TIMESTAMP, order);
}
if (pTable->imem) {
pCheckInfo->iiter = tSkipListCreateIterFromVal(pTable->imem->pData, (const char*) &pCheckInfo->lastKey,
TSDB_DATA_TYPE_TIMESTAMP, order);
}
// both iterators are NULL, no data in buffer right now
if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
return false;
}
bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter));
bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter));
if (memEmpty && imemEmpty) { // buffer is empty
return false;
}
if (!memEmpty) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
assert(node != NULL);
SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); // first timestamp in buffer
uTrace("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
} else {
uTrace("%p uid:%" PRId64 ", tid:%d no data in mem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
}
if (!imemEmpty) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
assert(node != NULL);
SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); // first timestamp in buffer
uTrace("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
} else {
uTrace("%p uid:%"PRId64", tid:%d no data in imem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
}
return true;
}
static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
......@@ -270,9 +342,8 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
return true;
}
// todo dynamic get the daysperfile
static int32_t getFileIdFromKey(TSKEY key) {
int64_t fid = (int64_t)(key / (10 * tsMsPerDay[0])); // set the starting fileId
static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile) {
int64_t fid = (int64_t)(key / (daysPerFile * tsMsPerDay[0])); // set the starting fileId
if (fid > INT32_MAX) {
fid = INT32_MAX;
}
......@@ -409,7 +480,7 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS
return pLocalIdList;
}
static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
SArray* sa);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
......@@ -456,17 +527,17 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
return false;
}
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
assert(pCols->numOfPoints == pBlock->numOfPoints);
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfPoints == pBlock->numOfPoints);
if (pCheckInfo->lastKey > pBlock->keyFirst) {
cur->pos =
binarySearchForKey(pCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order);
binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order);
} else {
cur->pos = 0;
}
filterDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
} else { // the whole block is loaded in to buffer
pQueryHandle->realNumOfRows = pBlock->numOfPoints;
cur->pos = 0;
......@@ -486,7 +557,8 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
cur->pos = pBlock->numOfPoints - 1;
}
filterDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
} else {
pQueryHandle->realNumOfRows = pBlock->numOfPoints;
cur->pos = pBlock->numOfPoints - 1;
......@@ -559,87 +631,208 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
return midPos;
}
static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo, int32_t capacity,
int32_t numOfRows, int32_t* pos, int32_t endPos) {
char* pData = NULL;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
TSKEY* tsArray = pCols->cols[0].pData;
int32_t numOfCols = pCols->numOfCols;
int32_t n = (*pos); // todo: the output buffer limitation and the query time window?
while(n < pBlockInfo->rows && n <= endPos && ((n - (*pos) + numOfRows) < capacity)) { n++;}
int32_t num = n - (*pos);
int32_t reqiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
//data in buffer has greater timestamp, copy data in file block
for (int32_t i = 0; i < reqiredNumOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
int32_t bytes = pColInfo->info.bytes;
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
}
for (int32_t j = 0; j < numOfCols; ++j) { // todo opt performance
SDataCol* src = &pCols->cols[j];
if (pColInfo->info.colId == src->colId) {
if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) {
memmove(pData, src->pData + bytes * (*pos), bytes * num);
} else { // handle the var-string
char* dst = pData;
// todo refactor, only copy one-by-one
for (int32_t k = (*pos); k < num + (*pos); ++k) {
char* p = tdGetColDataOfRow(src, k);
memcpy(dst, p, varDataTLen(p));
dst += bytes;
}
}
break;
}
}
}
*pos += num;
numOfRows += num;
pQueryHandle->cur.win.ekey = tsArray[(*pos) - 1];
pQueryHandle->cur.lastKey = pQueryHandle->cur.win.ekey + 1; // todo ???
return numOfRows;
}
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t capacity,
int32_t numOfRows, SDataRow row, STSchema* pSchema) {
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
int32_t numOfTableCols = schemaNCols(pSchema);
char* pData = NULL;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
}
int32_t offset = 0;
for (int32_t j = 0; j < numOfTableCols; ++j) {
if (pColInfo->info.colId == pSchema->columns[j].colId) {
offset = pSchema->columns[j].offset;
break;
}
}
assert(offset != -1); // todo handle error
void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + offset);
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
memcpy(pData, value, varDataTLen(value));
} else {
memcpy(pData, value, pColInfo->info.bytes);
}
}
}
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
SArray* sa) {
SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo blockInfo = getTrueDataBlockInfo(pCheckInfo, pBlock);
initSkipListIterator(pQueryHandle, pCheckInfo);
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
int32_t endPos = cur->pos;
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
endPos = blockInfo.rows - 1;
pQueryHandle->realNumOfRows = endPos - cur->pos + 1;
pCheckInfo->lastKey = blockInfo.window.ekey + 1;
} else if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
endPos = 0;
pQueryHandle->realNumOfRows = cur->pos + 1;
pCheckInfo->lastKey = blockInfo.window.ekey - 1;
} else {
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, order);
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
if (endPos < cur->pos) {
pQueryHandle->realNumOfRows = 0;
return;
} else {
pQueryHandle->realNumOfRows = endPos - cur->pos + 1;
}
pCheckInfo->lastKey = ((int64_t*)(pCols->cols[0].pData))[endPos] + 1;
} else {
if (endPos > cur->pos) {
pQueryHandle->realNumOfRows = 0;
return;
} else {
pQueryHandle->realNumOfRows = cur->pos - endPos + 1;
}
}
// if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
// if (endPos < cur->pos) {
// pQueryHandle->realNumOfRows = 0;
// return;
// } else {
// pQueryHandle->realNumOfRows = endPos - cur->pos + 1;
// }
//
// pCheckInfo->lastKey = ((int64_t*)(pCols->cols[0].pData))[endPos] + 1;
// } else {
// if (endPos > cur->pos) {
// pQueryHandle->realNumOfRows = 0;
// return;
// } else {
// pQueryHandle->realNumOfRows = cur->pos - endPos + 1;
// }
// }
}
int32_t start = MIN(cur->pos, endPos);
// if (start > 0) {
// tdPopDataColsPoints(pQueryHandle->rhelper.pDataCols[0], start);
// }
// move the data block in the front to data block if needed
int32_t numOfCols = pQueryHandle->rhelper.pDataCols[0]->numOfCols;
int32_t reqCols = taosArrayGetSize(pQueryHandle->pColumns);
// compared with the data from in-memory buffer, to generate the correct timestamp array list
int32_t pos = MIN(cur->pos, endPos);
for (int32_t i = 0; i < reqCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
int32_t bytes = pCol->info.bytes;
for (int32_t j = 0; j < numOfCols; ++j) { //todo opt performance
SDataCol* src = &pQueryHandle->rhelper.pDataCols[0]->cols[j];
if (pCol->info.colId == src->colId) {
if (pCol->info.type != TSDB_DATA_TYPE_BINARY && pCol->info.type != TSDB_DATA_TYPE_NCHAR) {
memmove(pCol->pData, src->pData + bytes * start, bytes * pQueryHandle->realNumOfRows);
} else { // handle the var-string
char* dst = pCol->pData;
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == 0);
TSKEY* tsArray = pCols->cols[0].pData;
// todo refactor, only copy one-by-one
for(int32_t k = start; k < pQueryHandle->realNumOfRows + start; ++k) {
char* p = tdGetColDataOfRow(src, k);
memcpy(dst, p, varDataTLen(p));
dst += bytes;
}
int32_t numOfRows = 0;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
// no data in buffer, load data from file directly
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
cur->win.skey = tsArray[pos];
copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, &pos, endPos);
return;
} else if (pCheckInfo->iter != NULL && pCheckInfo->iiter == NULL) {
// } else if (pCheckInfo->iter == NULL && pCheckInfo->iiter != NULL) {
// } else { // iter and iiter are all not NULL, three-way merge data block
STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj);
while (1) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
if (node == NULL) {
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos];
}
numOfRows = copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, &pos, endPos);
break;
}
SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row);
if (key < tsArray[pos]) {
copyOneRowFromMem(pQueryHandle, pCheckInfo, pQueryHandle->outputCapacity, numOfRows, row, pSchema);
numOfRows += 1;
cur->mixBlock = true;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
}
cur->win.ekey = key;
tSkipListIterNext(pCheckInfo->iter);
if (numOfRows >= pQueryHandle->outputCapacity) {
break;
}
} else if (key == tsArray[pos]) { //data in buffer has the same timestamp of data in file block, ignore it
tSkipListIterNext(pCheckInfo->iter);
} else if (key > tsArray[pos]) {
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos];
}
numOfRows = copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, &pos, endPos);
if (numOfRows >= pQueryHandle->outputCapacity ||
pQueryHandle->cur.lastKey >= blockInfo.window.ekey ||
pQueryHandle->cur.lastKey > pQueryHandle->window.ekey) {
break;
}
}
}
}
assert(pQueryHandle->realNumOfRows <= blockInfo.rows);
pCheckInfo->lastKey = cur->lastKey;
pQueryHandle->realNumOfRows = numOfRows;
cur->rows = numOfRows;
cur->pos = pos;
// forward(backward) the position for cursor
cur->pos = endPos;
uTrace("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" %p", pQueryHandle, cur->win.skey,
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
}
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
......@@ -879,7 +1072,7 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
// no data in file anymore
if (pQueryHandle->numOfBlocks <= 0) {
assert(pQueryHandle->pFileGroup == NULL);
cur->fid = -1;
cur->fid = -1; // denote that there are no data in file anymore
return false;
}
......@@ -888,10 +1081,7 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
SCompBlock* pBlock = pBlockInfo->pBlock.compBlock;
return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
return loadFileDataBlock(pQueryHandle, pBlockInfo->pBlock.compBlock, pBlockInfo->pTableCheckInfo);
}
static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
......@@ -901,30 +1091,34 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
// find the start data block in file
if (!pQueryHandle->locateStart) {
pQueryHandle->locateStart = true;
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey);
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pQueryHandle->pTsdb->config.daysPerFile);
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
return getDataBlocksInFilesImpl(pQueryHandle);
} else {
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { // all blocks
return getDataBlocksInFilesImpl(pQueryHandle);
} else { // next block of the same file
int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1:-1;
cur->slot += step;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
cur->pos = 0;
} else {
cur->pos = pBlockInfo->pBlock.compBlock->numOfPoints - 1;
// check if current file block is all consumed
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
// current block is done, try next
if (!cur->mixBlock || cur->pos >= pBlockInfo->pBlock.compBlock->numOfPoints) {
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists
return getDataBlocksInFilesImpl(pQueryHandle);
} else { // next block of the same file
int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) ? 1 : -1;
cur->slot += step;
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pNext->pBlock.compBlock, pNext->pTableCheckInfo);
}
return loadFileDataBlock(pQueryHandle, pBlockInfo->pBlock.compBlock, pBlockInfo->pTableCheckInfo);
} else {
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlockInfo->pBlock.compBlock, sa);
return pQueryHandle->pColumns;
}
}
}
......@@ -1032,7 +1226,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY
STsdbQueryHandle* pQueryHandle) {
int numOfRows = 0;
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
*skey = INT64_MIN;
*skey = TSKEY_INITIAL_VAL;
do {
SSkipListNode* node = tSkipListIterGet(pIter);
......@@ -1117,34 +1311,89 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
STable* pTable = NULL;
TSKEY skey = 0, ekey = 0;
int32_t rows = 0;
int32_t step = ASCENDING_ORDER_TRAVERSE(pHandle->order)? 1:-1;
// data in file
// there are data in file
if (pHandle->cur.fid >= 0) {
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
pTable = pBlockInfo->pTableCheckInfo->pTableObj;
SDataBlockInfo binfo = getTrueDataBlockInfo(pBlockInfo->pTableCheckInfo, pBlockInfo->pBlock.compBlock);
if (binfo.rows == pHandle->realNumOfRows) {
pBlockInfo->pTableCheckInfo->lastKey = pBlockInfo->pBlock.compBlock->keyLast + 1;
return binfo;
pTable = pCheckInfo->pTableObj;
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfo->pBlock.compBlock);
/*bool hasData = */initSkipListIterator(pHandle, pCheckInfo);
TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL;
if (pCheckInfo->iter != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
SDataRow row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
if (k1 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iter)) {
node = tSkipListIterGet(pCheckInfo->iter);
row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
} else {
k1 = TSKEY_INITIAL_VAL;
}
}
}
if (pCheckInfo->iiter != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
SDataRow row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
if (k2 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iiter)) {
node = tSkipListIterGet(pCheckInfo->iiter);
row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
} else {
k2 = TSKEY_INITIAL_VAL;
}
}
}
assert(0);
if ((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.ekey)) {
doLoadFileDataBlock(pHandle, pBlockInfo->pBlock.compBlock, pCheckInfo);
SArray* sa = getDefaultLoadColumns(pHandle, true);
mergeDataInDataBlock(pHandle, pCheckInfo, pBlockInfo->pBlock.compBlock, sa);
taosArrayDestroy(sa);
SDataBlockInfo blockInfo = {
.uid = pTable->tableId.uid,
.tid = pTable->tableId.tid,
.rows = pHandle->cur.rows,
.window = pHandle->cur.win,
};
return blockInfo;
} else {
/* not a whole disk block, only the qualified rows, so this block is loaded in to buffer during the
* block next function
/*
* no data in mem or imem, or data in mem|imem with greater timestamp, no need to load data in buffer
* return the file block info directly
*/
SColumnInfoData* pColInfoEx = taosArrayGet(pHandle->pColumns, 0);
rows = pHandle->realNumOfRows;
skey = *(TSKEY*)pColInfoEx->pData;
ekey = *(TSKEY*)((char*)pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1));
// update the last key value
pBlockInfo->pTableCheckInfo->lastKey = ekey + step;
if (!pHandle->cur.mixBlock && pHandle->cur.rows == pBlockInfo->pBlock.compBlock->numOfPoints) {
pBlockInfo->pTableCheckInfo->lastKey = pBlockInfo->pBlock.compBlock->keyLast + step;
assert(pHandle->outputCapacity >= pBlockInfo->pBlock.compBlock->numOfPoints);
return binfo;
} else {
SDataBlockInfo blockInfo = {
.uid = pTable->tableId.uid,
.tid = pTable->tableId.tid,
.rows = pHandle->cur.rows,
.window = pHandle->cur.win,
};
return blockInfo;
}
}
} else {
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
......@@ -1153,21 +1402,24 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
if (pTable->mem != NULL) {
// create mem table iterator if it is not created yet
assert(pCheckInfo->iter != NULL);
rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey, 4000, &skey, &ekey, pHandle);
STimeWindow* win = &pHandle->cur.win;
rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey,
pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API
// update the last key value
pCheckInfo->lastKey = ekey + step;
pCheckInfo->lastKey = win->ekey + step;
}
SDataBlockInfo blockInfo = {
.uid = pTable->tableId.uid,
.tid = pTable->tableId.tid,
.rows = rows,
.window = pHandle->cur.win,
};
return blockInfo;
}
SDataBlockInfo blockInfo = {
.uid = pTable->tableId.uid,
.tid = pTable->tableId.tid,
.rows = rows,
.window = {.skey = MIN(skey, ekey), .ekey = MAX(skey, ekey)}
};
return blockInfo;
}
// return null for data block in cache
......@@ -1189,12 +1441,12 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
STableBlockInfo* pBlockInfoEx = &pHandle->pDataBlockInfo[pHandle->cur.slot];
STableCheckInfo* pCheckInfo = pBlockInfoEx->pTableCheckInfo;
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfoEx->pBlock.compBlock);
assert(pHandle->realNumOfRows <= binfo.rows);
if (pHandle->realNumOfRows < binfo.rows) {
if (pHandle->cur.mixBlock) {
return pHandle->pColumns;
} else {
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfoEx->pBlock.compBlock);
assert(pHandle->realNumOfRows <= binfo.rows);
// data block has been loaded, todo extract method
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
......@@ -1206,7 +1458,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
doLoadFileDataBlock(pHandle, pBlock, pCheckInfo);
SArray* sa = getDefaultLoadColumns(pHandle, true);
filterDataInDataBlock(pHandle, pCheckInfo, pBlock, sa);
mergeDataInDataBlock(pHandle, pCheckInfo, pBlock, sa);
taosArrayDestroy(sa);
return pHandle->pColumns;
......@@ -1631,7 +1883,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
for (int32_t i = 0; i < cols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
tfree(pColInfo->pData);
tfree(pColInfo->pData);
}
taosArrayDestroy(pQueryHandle->pColumns);
......
......@@ -74,9 +74,8 @@ int main(int argc, char *argv[]) {
printf("success to connect to server\n");
doQuery(taos, "create database if not exists test");
doQuery(taos, "create database if not exists test");
// doQuery(taos, "use test");
// doQuery(taos, "select sum(k)*max(k), sum(k), max(k) from tm99");
doQuery(taos, "use test");
doQuery(taos, "select count(*) from m1 where ts>='2020-1-1 1:1:1' and ts<='2020-1-1 1:1:59' interval(500a) fill(value, 99)");
// doQuery(taos, "create table t1(ts timestamp, k binary(12), f nchar(2))");
// for(int32_t i = 0; i< 100000; ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册