diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 3625900cd265db29eddc1ed8a8d27fad859415f3..cdcea63c88082174438c747023402fd965ca13da 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -30,10 +30,10 @@ extern "C" { #include "tsqlfunction.h" #include "tutil.h" +#include "qExecutor.h" #include "qsqlparser.h" #include "qsqltype.h" #include "qtsbuf.h" -#include "queryExecutor.h" // forward declaration struct SSqlInfo; diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 23436fe6a589a805a165da4d1260b5c97dc3fa84..ec9655a66e49648a509bcd0b6ebdf561d971b37e 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -32,7 +32,7 @@ extern "C" { #define TSKEY int64_t #endif -#define TSWINDOW_INITIALIZER {INT64_MIN, INT64_MAX}; +#define TSWINDOW_INITIALIZER ((STimeWindow) {INT64_MIN, INT64_MAX}) #define TSKEY_INITIAL_VAL INT64_MIN // ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 32e35416927d144cdf3dabaa51e29ada6d390ac9..b1877838ca2b291b1109e85d5860c0de7a44b0f9 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -167,12 +167,6 @@ typedef struct { SArray *pGroupList; } STableGroupInfo; -typedef struct { -} SFields; - -#define TSDB_TS_GREATER_EQUAL 1 -#define TSDB_TS_LESS_EQUAL 2 - typedef struct SQueryRowCond { int32_t rel; TSKEY ts; diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/qExecutor.h similarity index 100% rename from src/query/inc/queryExecutor.h rename to src/query/inc/qExecutor.h diff --git a/src/query/inc/queryUtil.h b/src/query/inc/qUtil.h similarity index 100% rename from src/query/inc/queryUtil.h rename to src/query/inc/qUtil.h diff --git a/src/query/inc/tlosertree.h b/src/query/inc/tlosertree.h index 197d27a76126097f23cfc47a227957301f5b2c5c..4c731625dd5c7950c321b2180ca913e49362059b 100644 --- a/src/query/inc/tlosertree.h +++ b/src/query/inc/tlosertree.h @@ -32,7 +32,7 @@ typedef struct SLoserTreeNode { typedef struct SLoserTreeInfo { int32_t numOfEntries; int32_t totalEntries; - __merge_compare_fn_t comparaFn; + __merge_compare_fn_t comparFn; void * param; SLoserTreeNode *pNode; diff --git a/src/query/src/queryExecutor.c b/src/query/src/qExecutor.c similarity index 99% rename from src/query/src/queryExecutor.c rename to src/query/src/qExecutor.c index 913cd4280c4661e8c658ea60d9d5409c0ca7ebf5..81b179fbe636fb11f0b19f3e26d1b55888d4b0c1 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/qExecutor.c @@ -17,18 +17,18 @@ #include "hash.h" #include "hashfunc.h" +#include "qExecutor.h" +#include "qUtil.h" #include "qast.h" #include "qresultBuf.h" #include "query.h" -#include "queryExecutor.h" #include "queryLog.h" -#include "queryUtil.h" #include "taosmsg.h" +#include "tdataformat.h" #include "tlosertree.h" +#include "tscUtil.h" // todo move the function to common module #include "tscompression.h" #include "ttime.h" -#include "tscUtil.h" // todo move the function to common module -#include "tdataformat.h" #define DEFAULT_INTERN_BUF_SIZE 16384L diff --git a/src/query/src/queryFilterFunc.c b/src/query/src/qFilterFunc.c similarity index 99% rename from src/query/src/queryFilterFunc.c rename to src/query/src/qFilterFunc.c index 41a888e92d6134f004afce614719e07376b8a198..bcc9531c4e888e8a5bf70a363cb4b148df0aac53 100644 --- a/src/query/src/queryFilterFunc.c +++ b/src/query/src/qFilterFunc.c @@ -16,10 +16,10 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "qExecutor.h" #include "taosmsg.h" -#include "tsqlfunction.h" -#include "queryExecutor.h" #include "tcompare.h" +#include "tsqlfunction.h" bool less_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) { return (*(int8_t *)minval < pFilter->filterInfo.upperBndi); diff --git a/src/query/src/queryUtil.c b/src/query/src/qUtil.c similarity index 99% rename from src/query/src/queryUtil.c rename to src/query/src/qUtil.c index 9da02f9f0f9d97c5087027309bde844c09589ecd..87ad9dbeb4601e08c92de4ba98f296c3eb30c0db 100644 --- a/src/query/src/queryUtil.c +++ b/src/query/src/qUtil.c @@ -23,8 +23,8 @@ #include "qinterpolation.h" #include "ttime.h" -#include "queryExecutor.h" -#include "queryUtil.h" +#include "qExecutor.h" +#include "qUtil.h" int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size, int32_t threshold, int16_t type) { diff --git a/src/query/src/qast.c b/src/query/src/qast.c index f727acb72096e66bba6f377be20bcc3b46187df0..d784fa4102194558c7e3149d07491c2c6e319d4f 100644 --- a/src/query/src/qast.c +++ b/src/query/src/qast.c @@ -476,44 +476,6 @@ typedef struct { SEndPoint* end; } SQueryCond; -//static void setInitialValueForRangeQueryCondition(tSKipListQueryCond *q, int8_t type) { -// q->lowerBndRelOptr = TSDB_RELATION_GREATER; -// q->upperBndRelOptr = TSDB_RELATION_LESS; -// -// switch (type) { -// case TSDB_DATA_TYPE_BOOL: -// case TSDB_DATA_TYPE_TINYINT: -// case TSDB_DATA_TYPE_SMALLINT: -// case TSDB_DATA_TYPE_INT: -// case TSDB_DATA_TYPE_BIGINT: { -// q->upperBnd.nType = TSDB_DATA_TYPE_BIGINT; -// q->lowerBnd.nType = TSDB_DATA_TYPE_BIGINT; -// -// q->upperBnd.i64Key = INT64_MAX; -// q->lowerBnd.i64Key = INT64_MIN; -// break; -// }; -// case TSDB_DATA_TYPE_FLOAT: -// case TSDB_DATA_TYPE_DOUBLE: { -// q->upperBnd.nType = TSDB_DATA_TYPE_DOUBLE; -// q->lowerBnd.nType = TSDB_DATA_TYPE_DOUBLE; -// q->upperBnd.dKey = DBL_MAX; -// q->lowerBnd.dKey = -DBL_MIN; -// break; -// }; -// case TSDB_DATA_TYPE_NCHAR: -// case TSDB_DATA_TYPE_BINARY: { -// q->upperBnd.nType = type; -// q->upperBnd.pz = NULL; -// q->upperBnd.nLen = -1; -// -// q->lowerBnd.nType = type; -// q->lowerBnd.pz = NULL; -// q->lowerBnd.nLen = -1; -// } -// } -//} - // todo check for malloc failure static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) { int32_t optr = queryColInfo->optr; @@ -788,7 +750,6 @@ static void exprTreeTraverseImpl(tExprNode *pExpr, SArray *pResult, SExprTravers taosArrayCopy(pResult, array); } - static void tSQLBinaryTraverseOnSkipList(tExprNode *pExpr, SArray *pResult, SSkipList *pSkipList, SExprTraverseSupp *param ) { SSkipListIterator* iter = tSkipListCreateIter(pSkipList); @@ -834,8 +795,6 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, tSkipListDestroyIter(iter); } - - // post-root order traverse syntax tree void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) { if (pExpr == NULL) { @@ -1100,7 +1059,6 @@ static char* exception_strdup(const char* str) { return p; } - static tExprNode* exprTreeFromBinaryImpl(SBufferReader* br) { int32_t anchor = CLEANUP_GET_ANCHOR(); diff --git a/src/query/src/qinterpolation.c b/src/query/src/qinterpolation.c index c1939badcc697056f39fe8490967607006db4db6..6573f38682062bef41e9d25ecc89b4874a6f16a8 100644 --- a/src/query/src/qinterpolation.c +++ b/src/query/src/qinterpolation.c @@ -185,6 +185,7 @@ int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { : pFillInfo->rowIdx + 1; } +// todo: refactor static double linearInterpolationImpl(double v1, double v2, double k1, double k2, double k) { return v1 + (v2 - v1) * (k - k1) / (k2 - k1); } @@ -449,14 +450,6 @@ int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numO } } -void taosFillInfoSetSource(SFillInfo* pFillInfo, tFilePage **data, TSKEY endKey) { - pFillInfo->endKey = endKey; - - for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { - memcpy(pFillInfo->pData[i], data[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes); - } -} - void taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int64_t* outputRows, int32_t capacity) { int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator? diff --git a/src/query/src/tlosertree.c b/src/query/src/tlosertree.c index 4e3063da03c675999c4f00cae4163b3d163baf37..0d81f4604bfde20548a8eedf0e323a5a7125adf0 100644 --- a/src/query/src/tlosertree.c +++ b/src/query/src/tlosertree.c @@ -54,7 +54,7 @@ uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* pa (*pTree)->numOfEntries = numOfEntries; (*pTree)->totalEntries = totalEntries; (*pTree)->param = param; - (*pTree)->comparaFn = compareFn; + (*pTree)->comparFn = compareFn; // set initial value for loser tree tLoserTreeInit(*pTree); @@ -95,7 +95,7 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { return; } - int32_t ret = pTree->comparaFn(&pTree->pNode[parentId], &kLeaf, pTree->param); + int32_t ret = pTree->comparFn(&pTree->pNode[parentId], &kLeaf, pTree->param); if (ret < 0) { SLoserTreeNode t = pTree->pNode[parentId]; pTree->pNode[parentId] = kLeaf; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index a5cba70219a542963cc92b7e276866a8f1461c53..c84b55f11081fda7ea97bb6e8a06cd84ad50b2ab 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -49,6 +49,9 @@ typedef struct SQueryFilePos { int32_t slot; int32_t pos; int64_t lastKey; + int32_t rows; + bool mixBlock; + STimeWindow win; } SQueryFilePos; typedef struct SDataBlockLoadInfo { @@ -61,7 +64,6 @@ typedef struct SDataBlockLoadInfo { typedef struct SLoadCompBlockInfo { int32_t tid; /* table tid */ int32_t fileId; - int32_t fileListIndex; } SLoadCompBlockInfo; typedef struct STableCheckInfo { @@ -71,10 +73,13 @@ typedef struct STableCheckInfo { int32_t start; SCompInfo* pCompInfo; int32_t compSize; - int32_t numOfBlocks; // number of qualified data blocks not the original blocks + int32_t numOfBlocks; // number of qualified data blocks not the original blocks SDataCols* pDataCols; - SSkipListIterator* iter; + SSkipListIterator* iter; // skip list iterator + SSkipListIterator* iiter; // imem iterator + + bool hasObtainBuf; // if we should initialize the in-memory skip list iterator } STableCheckInfo; typedef struct { @@ -110,6 +115,7 @@ typedef struct STsdbQueryHandle { SField** pFields; SArray* pColumns; // column list, SColumnInfoData array list bool locateStart; + int32_t outputCapacity; int32_t realNumOfRows; SArray* pTableCheckInfo; //SArray int32_t activeIndex; @@ -134,7 +140,6 @@ static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { pCompBlockLoadInfo->tid = -1; pCompBlockLoadInfo->fileId = -1; - pCompBlockLoadInfo->fileListIndex = -1; } TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList) { @@ -149,6 +154,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb); pQueryHandle->cur.fid = -1; + pQueryHandle->cur.win = TSWINDOW_INITIALIZER; size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList); assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); @@ -186,15 +192,15 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable // allocate buffer in order to load data blocks from file int32_t numOfCols = pCond->numOfCols; - size_t bufferCapacity = 4096; - + pQueryHandle->outputCapacity = 4096; + pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); for (int32_t i = 0; i < pCond->numOfCols; ++i) { - SColumnInfoData pDest = {{0}, 0}; - - pDest.info = pCond->colList[i]; - pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCond->colList[i].bytes); - taosArrayPush(pQueryHandle->pColumns, &pDest); + SColumnInfoData colInfo = {{0}, 0}; + + colInfo.info = pCond->colList[i]; + colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes); + taosArrayPush(pQueryHandle->pColumns, &colInfo); } tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); @@ -223,6 +229,72 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* return pQueryHandle; } +static bool initSkipListIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) { + STable* pTable = pCheckInfo->pTableObj; + assert(pTable != NULL); + + if (pCheckInfo->hasObtainBuf) { + return true; + } + + pCheckInfo->hasObtainBuf = true; + int32_t order = pHandle->order; + + // no data in buffer, abort + if (pTable->mem == NULL && pTable->imem == NULL) { + return false; + } + + assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL); + + if (pTable->mem) { + pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey, + TSDB_DATA_TYPE_TIMESTAMP, order); + } + + if (pTable->imem) { + pCheckInfo->iiter = tSkipListCreateIterFromVal(pTable->imem->pData, (const char*) &pCheckInfo->lastKey, + TSDB_DATA_TYPE_TIMESTAMP, order); + } + + // both iterators are NULL, no data in buffer right now + if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) { + return false; + } + + bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter)); + bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter)); + if (memEmpty && imemEmpty) { // buffer is empty + return false; + } + + if (!memEmpty) { + SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); + assert(node != NULL); + + SDataRow row = SL_GET_NODE_DATA(node); + TSKEY key = dataRowKey(row); // first timestamp in buffer + uTrace("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle, + pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo); + } else { + uTrace("%p uid:%" PRId64 ", tid:%d no data in mem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid); + } + + if (!imemEmpty) { + SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); + assert(node != NULL); + + SDataRow row = SL_GET_NODE_DATA(node); + TSKEY key = dataRowKey(row); // first timestamp in buffer + uTrace("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle, + pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo); + } else { + uTrace("%p uid:%"PRId64", tid:%d no data in imem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid); + } + + return true; +} + static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { size_t size = taosArrayGetSize(pHandle->pTableCheckInfo); assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1); @@ -270,9 +342,8 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { return true; } -// todo dynamic get the daysperfile -static int32_t getFileIdFromKey(TSKEY key) { - int64_t fid = (int64_t)(key / (10 * tsMsPerDay[0])); // set the starting fileId +static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile) { + int64_t fid = (int64_t)(key / (daysPerFile * tsMsPerDay[0])); // set the starting fileId if (fid > INT32_MAX) { fid = INT32_MAX; } @@ -409,7 +480,7 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS return pLocalIdList; } -static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, +static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, SArray* sa); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); @@ -456,17 +527,17 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock return false; } - SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; - assert(pCols->numOfPoints == pBlock->numOfPoints); + SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; + assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfPoints == pBlock->numOfPoints); if (pCheckInfo->lastKey > pBlock->keyFirst) { cur->pos = - binarySearchForKey(pCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order); + binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order); } else { cur->pos = 0; } - - filterDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); + + mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); } else { // the whole block is loaded in to buffer pQueryHandle->realNumOfRows = pBlock->numOfPoints; cur->pos = 0; @@ -486,7 +557,8 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock cur->pos = pBlock->numOfPoints - 1; } - filterDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); + + mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); } else { pQueryHandle->realNumOfRows = pBlock->numOfPoints; cur->pos = pBlock->numOfPoints - 1; @@ -559,87 +631,208 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) { return midPos; } +static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo, int32_t capacity, + int32_t numOfRows, int32_t* pos, int32_t endPos) { + char* pData = NULL; + SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; + TSKEY* tsArray = pCols->cols[0].pData; + + int32_t numOfCols = pCols->numOfCols; + + int32_t n = (*pos); // todo: the output buffer limitation and the query time window? + while(n < pBlockInfo->rows && n <= endPos && ((n - (*pos) + numOfRows) < capacity)) { n++;} + + int32_t num = n - (*pos); + int32_t reqiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); + + //data in buffer has greater timestamp, copy data in file block + for (int32_t i = 0; i < reqiredNumOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + int32_t bytes = pColInfo->info.bytes; + + if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { + pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; + } + + for (int32_t j = 0; j < numOfCols; ++j) { // todo opt performance + SDataCol* src = &pCols->cols[j]; + + if (pColInfo->info.colId == src->colId) { + + if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) { + memmove(pData, src->pData + bytes * (*pos), bytes * num); + } else { // handle the var-string + char* dst = pData; + + // todo refactor, only copy one-by-one + for (int32_t k = (*pos); k < num + (*pos); ++k) { + char* p = tdGetColDataOfRow(src, k); + memcpy(dst, p, varDataTLen(p)); + dst += bytes; + } + } + + break; + } + } + } + + *pos += num; + numOfRows += num; + + pQueryHandle->cur.win.ekey = tsArray[(*pos) - 1]; + pQueryHandle->cur.lastKey = pQueryHandle->cur.win.ekey + 1; // todo ??? + + return numOfRows; +} + +static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t capacity, + int32_t numOfRows, SDataRow row, STSchema* pSchema) { + int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); + int32_t numOfTableCols = schemaNCols(pSchema); + + char* pData = NULL; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + + if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { + pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; + } + + int32_t offset = 0; + for (int32_t j = 0; j < numOfTableCols; ++j) { + if (pColInfo->info.colId == pSchema->columns[j].colId) { + offset = pSchema->columns[j].offset; + break; + } + } + + assert(offset != -1); // todo handle error + void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + offset); + + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + memcpy(pData, value, varDataTLen(value)); + } else { + memcpy(pData, value, pColInfo->info.bytes); + } + } +} + // only return the qualified data to client in terms of query time window, data rows in the same block but do not // be included in the query time window will be discarded -static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, +static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, SArray* sa) { SQueryFilePos* cur = &pQueryHandle->cur; SDataBlockInfo blockInfo = getTrueDataBlockInfo(pCheckInfo, pBlock); + initSkipListIterator(pQueryHandle, pCheckInfo); SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; int32_t endPos = cur->pos; if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) { endPos = blockInfo.rows - 1; - pQueryHandle->realNumOfRows = endPos - cur->pos + 1; - pCheckInfo->lastKey = blockInfo.window.ekey + 1; } else if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) { endPos = 0; - pQueryHandle->realNumOfRows = cur->pos + 1; - pCheckInfo->lastKey = blockInfo.window.ekey - 1; } else { int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, order); - if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { - if (endPos < cur->pos) { - pQueryHandle->realNumOfRows = 0; - return; - } else { - pQueryHandle->realNumOfRows = endPos - cur->pos + 1; - } - - pCheckInfo->lastKey = ((int64_t*)(pCols->cols[0].pData))[endPos] + 1; - } else { - if (endPos > cur->pos) { - pQueryHandle->realNumOfRows = 0; - return; - } else { - pQueryHandle->realNumOfRows = cur->pos - endPos + 1; - } - } +// if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { +// if (endPos < cur->pos) { +// pQueryHandle->realNumOfRows = 0; +// return; +// } else { +// pQueryHandle->realNumOfRows = endPos - cur->pos + 1; +// } +// +// pCheckInfo->lastKey = ((int64_t*)(pCols->cols[0].pData))[endPos] + 1; +// } else { +// if (endPos > cur->pos) { +// pQueryHandle->realNumOfRows = 0; +// return; +// } else { +// pQueryHandle->realNumOfRows = cur->pos - endPos + 1; +// } +// } } - - int32_t start = MIN(cur->pos, endPos); -// if (start > 0) { -// tdPopDataColsPoints(pQueryHandle->rhelper.pDataCols[0], start); -// } - - // move the data block in the front to data block if needed - int32_t numOfCols = pQueryHandle->rhelper.pDataCols[0]->numOfCols; - int32_t reqCols = taosArrayGetSize(pQueryHandle->pColumns); + // compared with the data from in-memory buffer, to generate the correct timestamp array list + int32_t pos = MIN(cur->pos, endPos); - for (int32_t i = 0; i < reqCols; ++i) { - SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); - int32_t bytes = pCol->info.bytes; - - for (int32_t j = 0; j < numOfCols; ++j) { //todo opt performance - SDataCol* src = &pQueryHandle->rhelper.pDataCols[0]->cols[j]; - - if (pCol->info.colId == src->colId) { - if (pCol->info.type != TSDB_DATA_TYPE_BINARY && pCol->info.type != TSDB_DATA_TYPE_NCHAR) { - memmove(pCol->pData, src->pData + bytes * start, bytes * pQueryHandle->realNumOfRows); - } else { // handle the var-string - char* dst = pCol->pData; + assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == 0); + TSKEY* tsArray = pCols->cols[0].pData; - // todo refactor, only copy one-by-one - for(int32_t k = start; k < pQueryHandle->realNumOfRows + start; ++k) { - char* p = tdGetColDataOfRow(src, k); - memcpy(dst, p, varDataTLen(p)); - dst += bytes; - } + int32_t numOfRows = 0; + pQueryHandle->cur.win = TSWINDOW_INITIALIZER; + + // no data in buffer, load data from file directly + if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) { + cur->win.skey = tsArray[pos]; + copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, &pos, endPos); + return; + } else if (pCheckInfo->iter != NULL && pCheckInfo->iiter == NULL) { + // } else if (pCheckInfo->iter == NULL && pCheckInfo->iiter != NULL) { + // } else { // iter and iiter are all not NULL, three-way merge data block + STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj); + + while (1) { + SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); + if (node == NULL) { + if (cur->win.skey == TSKEY_INITIAL_VAL) { + cur->win.skey = tsArray[pos]; } + numOfRows = copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, &pos, endPos); break; } + + SDataRow row = SL_GET_NODE_DATA(node); + TSKEY key = dataRowKey(row); + + if (key < tsArray[pos]) { + copyOneRowFromMem(pQueryHandle, pCheckInfo, pQueryHandle->outputCapacity, numOfRows, row, pSchema); + numOfRows += 1; + cur->mixBlock = true; + + if (cur->win.skey == TSKEY_INITIAL_VAL) { + cur->win.skey = key; + } + + cur->win.ekey = key; + tSkipListIterNext(pCheckInfo->iter); + + if (numOfRows >= pQueryHandle->outputCapacity) { + break; + } + } else if (key == tsArray[pos]) { //data in buffer has the same timestamp of data in file block, ignore it + tSkipListIterNext(pCheckInfo->iter); + } else if (key > tsArray[pos]) { + if (cur->win.skey == TSKEY_INITIAL_VAL) { + cur->win.skey = tsArray[pos]; + } + + numOfRows = copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, &pos, endPos); + + if (numOfRows >= pQueryHandle->outputCapacity || + pQueryHandle->cur.lastKey >= blockInfo.window.ekey || + pQueryHandle->cur.lastKey > pQueryHandle->window.ekey) { + break; + } + } } } - assert(pQueryHandle->realNumOfRows <= blockInfo.rows); + pCheckInfo->lastKey = cur->lastKey; + pQueryHandle->realNumOfRows = numOfRows; + cur->rows = numOfRows; + cur->pos = pos; - // forward(backward) the position for cursor - cur->pos = endPos; + uTrace("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" %p", pQueryHandle, cur->win.skey, + cur->win.ekey, cur->rows, pQueryHandle->qinfo); } int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { @@ -879,7 +1072,7 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) { // no data in file anymore if (pQueryHandle->numOfBlocks <= 0) { assert(pQueryHandle->pFileGroup == NULL); - cur->fid = -1; + cur->fid = -1; // denote that there are no data in file anymore return false; } @@ -888,10 +1081,7 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) { cur->fid = pQueryHandle->pFileGroup->fileId; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; - STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; - SCompBlock* pBlock = pBlockInfo->pBlock.compBlock; - - return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); + return loadFileDataBlock(pQueryHandle, pBlockInfo->pBlock.compBlock, pBlockInfo->pTableCheckInfo); } static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { @@ -901,30 +1091,34 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { // find the start data block in file if (!pQueryHandle->locateStart) { pQueryHandle->locateStart = true; - - int32_t fid = getFileIdFromKey(pQueryHandle->window.skey); + int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pQueryHandle->pTsdb->config.daysPerFile); tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order); tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid); return getDataBlocksInFilesImpl(pQueryHandle); } else { - if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || - (cur->slot == 0 && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { // all blocks - - return getDataBlocksInFilesImpl(pQueryHandle); - } else { // next block of the same file - int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1:-1; - cur->slot += step; - - STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; - if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { - cur->pos = 0; - } else { - cur->pos = pBlockInfo->pBlock.compBlock->numOfPoints - 1; + // check if current file block is all consumed + STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; + STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; + + // current block is done, try next + if (!cur->mixBlock || cur->pos >= pBlockInfo->pBlock.compBlock->numOfPoints) { + if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || + (cur->slot == 0 && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { + // all data blocks in current file has been checked already, try next file if exists + return getDataBlocksInFilesImpl(pQueryHandle); + } else { // next block of the same file + int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) ? 1 : -1; + cur->slot += step; + + STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot]; + return loadFileDataBlock(pQueryHandle, pNext->pBlock.compBlock, pNext->pTableCheckInfo); } - - return loadFileDataBlock(pQueryHandle, pBlockInfo->pBlock.compBlock, pBlockInfo->pTableCheckInfo); + } else { + SArray* sa = getDefaultLoadColumns(pQueryHandle, true); + mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlockInfo->pBlock.compBlock, sa); + return pQueryHandle->pColumns; } } } @@ -1032,7 +1226,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY STsdbQueryHandle* pQueryHandle) { int numOfRows = 0; int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); - *skey = INT64_MIN; + *skey = TSKEY_INITIAL_VAL; do { SSkipListNode* node = tSkipListIterGet(pIter); @@ -1117,34 +1311,89 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) { STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle; STable* pTable = NULL; - - TSKEY skey = 0, ekey = 0; int32_t rows = 0; int32_t step = ASCENDING_ORDER_TRAVERSE(pHandle->order)? 1:-1; - // data in file + // there are data in file if (pHandle->cur.fid >= 0) { STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot]; + STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; - pTable = pBlockInfo->pTableCheckInfo->pTableObj; - - SDataBlockInfo binfo = getTrueDataBlockInfo(pBlockInfo->pTableCheckInfo, pBlockInfo->pBlock.compBlock); - if (binfo.rows == pHandle->realNumOfRows) { - pBlockInfo->pTableCheckInfo->lastKey = pBlockInfo->pBlock.compBlock->keyLast + 1; - return binfo; + pTable = pCheckInfo->pTableObj; + + SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfo->pBlock.compBlock); + /*bool hasData = */initSkipListIterator(pHandle, pCheckInfo); + + TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL; + if (pCheckInfo->iter != NULL) { + SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); + SDataRow row = SL_GET_NODE_DATA(node); + k1 = dataRowKey(row); + + if (k1 == binfo.window.skey) { + if (tSkipListIterNext(pCheckInfo->iter)) { + node = tSkipListIterGet(pCheckInfo->iter); + row = SL_GET_NODE_DATA(node); + k1 = dataRowKey(row); + } else { + k1 = TSKEY_INITIAL_VAL; + } + } + } + + if (pCheckInfo->iiter != NULL) { + SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); + SDataRow row = SL_GET_NODE_DATA(node); + k2 = dataRowKey(row); + + if (k2 == binfo.window.skey) { + if (tSkipListIterNext(pCheckInfo->iiter)) { + node = tSkipListIterGet(pCheckInfo->iiter); + row = SL_GET_NODE_DATA(node); + k2 = dataRowKey(row); + } else { + k2 = TSKEY_INITIAL_VAL; + } + } + } + + assert(0); + if ((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.ekey)) { + doLoadFileDataBlock(pHandle, pBlockInfo->pBlock.compBlock, pCheckInfo); + + SArray* sa = getDefaultLoadColumns(pHandle, true); + mergeDataInDataBlock(pHandle, pCheckInfo, pBlockInfo->pBlock.compBlock, sa); + taosArrayDestroy(sa); + + SDataBlockInfo blockInfo = { + .uid = pTable->tableId.uid, + .tid = pTable->tableId.tid, + .rows = pHandle->cur.rows, + .window = pHandle->cur.win, + }; + + return blockInfo; } else { - /* not a whole disk block, only the qualified rows, so this block is loaded in to buffer during the - * block next function + /* + * no data in mem or imem, or data in mem|imem with greater timestamp, no need to load data in buffer + * return the file block info directly */ - SColumnInfoData* pColInfoEx = taosArrayGet(pHandle->pColumns, 0); - - rows = pHandle->realNumOfRows; - skey = *(TSKEY*)pColInfoEx->pData; - ekey = *(TSKEY*)((char*)pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1)); - - // update the last key value - pBlockInfo->pTableCheckInfo->lastKey = ekey + step; + if (!pHandle->cur.mixBlock && pHandle->cur.rows == pBlockInfo->pBlock.compBlock->numOfPoints) { + pBlockInfo->pTableCheckInfo->lastKey = pBlockInfo->pBlock.compBlock->keyLast + step; + assert(pHandle->outputCapacity >= pBlockInfo->pBlock.compBlock->numOfPoints); + + return binfo; + } else { + SDataBlockInfo blockInfo = { + .uid = pTable->tableId.uid, + .tid = pTable->tableId.tid, + .rows = pHandle->cur.rows, + .window = pHandle->cur.win, + }; + + return blockInfo; + } } } else { STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); @@ -1153,21 +1402,24 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) { if (pTable->mem != NULL) { // create mem table iterator if it is not created yet assert(pCheckInfo->iter != NULL); - rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey, 4000, &skey, &ekey, pHandle); + STimeWindow* win = &pHandle->cur.win; + + rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey, + pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API // update the last key value - pCheckInfo->lastKey = ekey + step; + pCheckInfo->lastKey = win->ekey + step; } + + SDataBlockInfo blockInfo = { + .uid = pTable->tableId.uid, + .tid = pTable->tableId.tid, + .rows = rows, + .window = pHandle->cur.win, + }; + + return blockInfo; } - - SDataBlockInfo blockInfo = { - .uid = pTable->tableId.uid, - .tid = pTable->tableId.tid, - .rows = rows, - .window = {.skey = MIN(skey, ekey), .ekey = MAX(skey, ekey)} - }; - - return blockInfo; } // return null for data block in cache @@ -1189,12 +1441,12 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { STableBlockInfo* pBlockInfoEx = &pHandle->pDataBlockInfo[pHandle->cur.slot]; STableCheckInfo* pCheckInfo = pBlockInfoEx->pTableCheckInfo; - SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfoEx->pBlock.compBlock); - assert(pHandle->realNumOfRows <= binfo.rows); - - if (pHandle->realNumOfRows < binfo.rows) { + if (pHandle->cur.mixBlock) { return pHandle->pColumns; } else { + SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfoEx->pBlock.compBlock); + assert(pHandle->realNumOfRows <= binfo.rows); + // data block has been loaded, todo extract method SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo; @@ -1206,7 +1458,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { doLoadFileDataBlock(pHandle, pBlock, pCheckInfo); SArray* sa = getDefaultLoadColumns(pHandle, true); - filterDataInDataBlock(pHandle, pCheckInfo, pBlock, sa); + mergeDataInDataBlock(pHandle, pCheckInfo, pBlock, sa); taosArrayDestroy(sa); return pHandle->pColumns; @@ -1631,7 +1883,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { size_t cols = taosArrayGetSize(pQueryHandle->pColumns); for (int32_t i = 0; i < cols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - tfree(pColInfo->pData); + tfree(pColInfo->pData); } taosArrayDestroy(pQueryHandle->pColumns); diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index 70b1b2067f43c11a734a6a13dfec997a6971398b..76bfa5949e4a09afecc6f8c3bba5a408551cc84b 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -74,9 +74,8 @@ int main(int argc, char *argv[]) { printf("success to connect to server\n"); doQuery(taos, "create database if not exists test"); - doQuery(taos, "create database if not exists test"); -// doQuery(taos, "use test"); -// doQuery(taos, "select sum(k)*max(k), sum(k), max(k) from tm99"); + doQuery(taos, "use test"); + doQuery(taos, "select count(*) from m1 where ts>='2020-1-1 1:1:1' and ts<='2020-1-1 1:1:59' interval(500a) fill(value, 99)"); // doQuery(taos, "create table t1(ts timestamp, k binary(12), f nchar(2))"); // for(int32_t i = 0; i< 100000; ++i) {