未验证 提交 57b73505 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1460 from taosdata/liaohj_2

Liaohj 2
......@@ -20,6 +20,7 @@
#include "dataformat.h"
#include "taosdef.h"
#include "tglobalcfg.h"
#include "tsdb.h"
#ifdef __cplusplus
extern "C" {
......@@ -148,6 +149,8 @@ typedef struct {
SCompCol cols[];
} SCompData;
STsdbFileH* tsdbGetFile(tsdb_repo_t* pRepo);
int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols);
int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables);
......
......@@ -551,6 +551,11 @@ STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo) {
return tsdb->tsdbMeta;
}
STsdbFileH* tsdbGetFile(tsdb_repo_t* pRepo) {
STsdbRepo* tsdb = (STsdbRepo*) pRepo;
return tsdb->tsdbFileH;
}
// Check the configuration and set default options
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
// Check precision
......
......@@ -13,8 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tlog.h>
#include "os.h"
#include "tlog.h"
#include "tutil.h"
#include "../../../query/inc/qast.h"
......@@ -28,6 +29,11 @@
#define QUERY_IS_ASC_QUERY(o) (o == TSQL_SO_ASC)
#define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns))
enum {
QUERY_RANGE_LESS_EQUAL = 0,
QUERY_RANGE_GREATER_EQUAL = 1,
};
typedef struct SField {
// todo need the definition
} SField;
......@@ -36,12 +42,12 @@ typedef struct SHeaderFileInfo {
int32_t fileId;
} SHeaderFileInfo;
typedef struct SQueryHandlePos {
int32_t fileId;
typedef struct SQueryFilePos {
int32_t fid;
int32_t slot;
int32_t pos;
int32_t fileIndex;
} SQueryHandlePos;
int64_t lastKey;
} SQueryFilePos;
typedef struct SDataBlockLoadInfo {
int32_t fileListIndex;
......@@ -78,8 +84,12 @@ typedef struct STableCheckInfo {
TSKEY lastKey;
STable * pTableObj;
int64_t offsetInHeaderFile;
int32_t numOfBlocks;
// int32_t numOfBlocks;
int32_t start;
bool checkFirstFileBlock;
SCompIdx* compIndex;
SCompBlock *pBlock;
SSkipListIterator* iter;
} STableCheckInfo;
......@@ -104,8 +114,8 @@ enum {
typedef struct STsdbQueryHandle {
struct STsdbRepo* pTsdb;
int8_t model; // access model, single table model or multi-table model
SQueryHandlePos cur; // current position
SQueryHandlePos start; // the start position, used for secondary/third iteration
SQueryFilePos cur; // current position
SQueryFilePos start; // the start position, used for secondary/third iteration
int32_t unzipBufSize;
char *unzipBuffer;
char *secondaryUnzipBuffer;
......@@ -342,6 +352,344 @@ static bool hasMoreDataInCacheForSingleModel(STsdbQueryHandle* pHandle) {
return true;
}
// todo dynamic get the daysperfile
static int32_t getFileIdFromKey(TSKEY key) {
return (int32_t)(key / 10); // set the starting fileId
}
static int32_t getFileCompInfo(STableCheckInfo* pCheckInfo, SFileGroup* fileGroup) {
tsdbLoadCompIdx(fileGroup, pCheckInfo->compIndex, 10000); // todo set dynamic max tables
SCompIdx* compIndex = &pCheckInfo->compIndex[pCheckInfo->tableId.tid];
if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file
} else {
tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pBlock);
}
return TSDB_CODE_SUCCESS;
}
static int32_t binarySearchForBlockImpl(SCompBlock *pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
int32_t firstSlot = 0;
int32_t lastSlot = numOfBlocks - 1;
int32_t midSlot = firstSlot;
while (1) {
numOfBlocks = lastSlot - firstSlot + 1;
midSlot = (firstSlot + (numOfBlocks >> 1));
if (numOfBlocks == 1) break;
if (skey > pBlock[midSlot].keyLast) {
if (numOfBlocks == 2) break;
if ((order == TSQL_SO_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
firstSlot = midSlot + 1;
} else if (skey < pBlock[midSlot].keyFirst) {
if ((order == TSQL_SO_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break;
lastSlot = midSlot - 1;
} else {
break; // got the slot
}
}
return midSlot;
}
static SDataBlockInfo getTrueBlockInfo(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) {
SDataBlockInfo info = {{0}, 0};
SCompBlock *pDiskBlock = &pCheckInfo->pBlock[pHandle->cur.slot];
info.window.skey = pDiskBlock->keyFirst;
info.window.ekey = pDiskBlock->keyLast;
info.size = pDiskBlock->numOfPoints;
info.numOfCols = pDiskBlock->numOfCols;
return info;
}
bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) {
SQueryFilePos *cur = &pQueryHandle->cur;
if (pQueryHandle->cur.fid >= 0) {
int32_t fileIndex = -1;
/*
* 1. ascending order. The last data block of data file
* 2. descending order. The first block of file
*/
if ((step == QUERY_ASC_FORWARD_STEP && (pQueryHandle->cur.slot == pQueryHandle->numOfBlocks - 1)) ||
(step == QUERY_DESC_FORWARD_STEP && (pQueryHandle->cur.slot == 0))) {
// temporarily keep the position value, in case of no data qualified when move forwards(backwards)
SQueryFilePos save = pQueryHandle->cur;
// fileIndex = getNextDataFileCompInfo_(pQueryHandle, &pQueryHandle->cur, &pQueryHandle->vnodeFileInfo, step);
// first data block in the next file
if (fileIndex >= 0) {
cur->slot = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQueryHandle->numOfBlocks - 1;
cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQueryHandle->pBlock[cur->slot].numOfPoints - 1;
// return loadQaulifiedData(pQueryHandle);
} else {// try data in cache
assert(cur->fid == -1);
if (step == QUERY_ASC_FORWARD_STEP) {
// TSKEY nextTimestamp =
// getQueryStartPositionInCache_rv(pQueryHandle, &pQueryHandle->cur.slot, &pQueryHandle->cur.pos, true);
// if (nextTimestamp < 0) {
// pQueryHandle->cur = save;
// }
// return (nextTimestamp > 0);
}
// no data to check for desc order query, restore the saved position value
pQueryHandle->cur = save;
return false;
}
}
// next block in the same file
int32_t fid = cur->fid;
// fileIndex = vnodeGetVnodeHeaderFileIndex(&fid, pQueryHandle->order, &pQueryHandle->vnodeFileInfo);
cur->slot += step;
SCompBlock *pBlock = &pQueryHandle->pBlock[cur->slot];
cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pBlock->numOfPoints - 1;
// return loadQaulifiedData(pQueryHandle);
} else { // data in cache
// todo continue;
}
}
int vnodeBinarySearchKey(char *pValue, int num, TSKEY key, int order) {
int firstPos, lastPos, midPos = -1;
int numOfPoints;
TSKEY *keyList;
if (num <= 0) return -1;
keyList = (TSKEY *)pValue;
firstPos = 0;
lastPos = num - 1;
if (order == 0) {
// find the first position which is smaller than the key
while (1) {
if (key >= keyList[lastPos]) return lastPos;
if (key == keyList[firstPos]) return firstPos;
if (key < keyList[firstPos]) return firstPos - 1;
numOfPoints = lastPos - firstPos + 1;
midPos = (numOfPoints >> 1) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
}
} else {
// find the first position which is bigger than the key
while (1) {
if (key <= keyList[firstPos]) return firstPos;
if (key == keyList[lastPos]) return lastPos;
if (key > keyList[lastPos]) {
lastPos = lastPos + 1;
if (lastPos >= num)
return -1;
else
return lastPos;
}
numOfPoints = lastPos - firstPos + 1;
midPos = (numOfPoints >> 1) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
}
}
return midPos;
}
static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SArray *sa) {
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
SQueryFilePos *cur = &pQueryHandle->cur;
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
SDataBlockInfo blockInfo = getTrueBlockInfo(pQueryHandle, pCheckInfo);
int32_t endPos = cur->pos;
if (QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
endPos = blockInfo.size - 1;
pQueryHandle->realNumOfRows = endPos - cur->pos + 1;
} else if (!QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
endPos = 0;
pQueryHandle->realNumOfRows = cur->pos + 1;
} else {
// endPos = vnodeBinarySearchKey(pQueryHandle->tsBuf->data, blockInfo.size, pQueryHandle->window.ekey, pQueryHandle->order);
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
if (endPos < cur->pos) {
pQueryHandle->realNumOfRows = 0;
return;
} else {
pQueryHandle->realNumOfRows = endPos - cur->pos;
}
} else {
if (endPos > cur->pos) {
pQueryHandle->realNumOfRows = 0;
return;
} else {
pQueryHandle->realNumOfRows = cur->pos - endPos;
}
}
}
int32_t start = MIN(cur->pos, endPos);
// move the data block in the front to data block if needed
if (start != 0) {
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
for (int32_t i = 0; i < taosArrayGetSize(sa); ++i) {
int16_t colId = *(int16_t *)taosArrayGet(sa, i);
for (int32_t j = 0; j < numOfCols; ++j) {
SColumnInfoEx *pCol = taosArrayGet(pQueryHandle->pColumns, j);
if (pCol->info.colId == colId) {
memmove(pCol->pData, ((char *)pCol->pData) + pCol->info.bytes * start, pQueryHandle->realNumOfRows * pCol->info.bytes);
break;
}
}
}
}
assert(pQueryHandle->realNumOfRows <= blockInfo.size);
// forward(backward) the position for cursor
cur->pos = endPos;
}
static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInfo* pCheckInfo, int32_t type) {
STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
int32_t fid = getFileIdFromKey(pCheckInfo->lastKey);
SFileGroup* fileGroup = tsdbSearchFGroup(pFileHandle, fid);
pCheckInfo->checkFirstFileBlock = true;
SQueryFilePos* cur = &pQueryHandle->cur;
TSKEY key = pCheckInfo->lastKey;
int32_t index = -1;
// todo add iterator for filegroup
while (1) {
if ((fid = getFileCompInfo(pCheckInfo, fileGroup)) < 0) {
break;
}
int32_t tid = pCheckInfo->tableId.tid;
index = binarySearchForBlockImpl(pCheckInfo->pBlock, pCheckInfo->compIndex[tid].numOfSuperBlocks, pQueryHandle->order, key);
if (type == QUERY_RANGE_GREATER_EQUAL) {
if (key <= pCheckInfo->pBlock[index].keyLast) {
break;
} else {
index = -1;
}
} else {
if (key >= pCheckInfo->pBlock[index].keyFirst) {
break;
} else {
index = -1;
}
}
}
// failed to find qualified point in file, abort
if (index == -1) {
return false;
}
assert(index >= 0 && index < pQueryHandle->numOfBlocks);
// load first data block into memory failed, caused by disk block error
bool blockLoaded = false;
SArray *sa = NULL;
// todo no need to loaded at all
cur->slot = index;
// sa = getDefaultLoadColumns(pQueryHandle, true);
if (tsdbLoadDataBlock(&fileGroup->files[2], &pCheckInfo->pBlock[cur->slot], 1, fid, sa) == 0) {
blockLoaded = true;
}
// dError("QInfo:%p fileId:%d total numOfBlks:%d blockId:%d load into memory failed due to error in disk files",
// GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->numOfBlocks, blkIdx);
// failed to load data from disk, abort current query
if (blockLoaded == false) {
return false;
}
// todo search qualified points in blk, according to primary key (timestamp) column
// cur->pos = binarySearchForBlockImpl(ptsBuf->data, pBlocks->numOfPoints, key, pQueryHandle->order);
assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0);
filterDataInDataBlock(pQueryHandle, sa);
return pQueryHandle->realNumOfRows > 0;
}
static bool hasMoreDataInFileForSingleTableModel(STsdbQueryHandle* pHandle) {
assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1);
STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb);
SQueryFilePos* cur = &pHandle->cur;
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
if (!pCheckInfo->checkFirstFileBlock && pFileHandle != NULL) {
int32_t fid = getFileIdFromKey(pCheckInfo->lastKey);
SFileGroup* fileGroup = tsdbSearchFGroup(pFileHandle, fid);
pCheckInfo->checkFirstFileBlock = true;
if (fileGroup != NULL) {
return getQualifiedDataBlock(pHandle, pCheckInfo, 1);
} else { // no data in file, try cache
return hasMoreDataInCacheForSingleModel(pHandle);
}
} else {
pCheckInfo->checkFirstFileBlock = true;
if (pFileHandle == NULL) {
cur->fid = -1;
}
if (cur->fid == -1 || pFileHandle != NULL) { // try data in cache
return hasMoreDataInCacheForSingleModel(pHandle);
} else {
return true;
}
}
}
static bool hasMoreDataInCacheForMultiModel(STsdbQueryHandle* pHandle) {
size_t numOfTables = taosArrayGetSize(pHandle->pTableCheckInfo);
assert(numOfTables > 0);
......@@ -372,7 +720,7 @@ static bool hasMoreDataInCacheForMultiModel(STsdbQueryHandle* pHandle) {
bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
if (pHandle->model == SINGLE_TABLE_MODEL) {
return hasMoreDataInCacheForSingleModel(pHandle);
return hasMoreDataInFileForSingleTableModel(pHandle);
} else {
return hasMoreDataInCacheForMultiModel(pHandle);
}
......@@ -704,8 +1052,6 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
char buf[TSDB_MAX_TAGS_LEN] = {0};
char* val = dataRowTuple(pTable->tagVal); // todo not only the first column
int8_t type = pInfo->sch.type;
......@@ -765,9 +1111,11 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond
// query according to the binary expression
SSyntaxTreeFilterSupporter s = {.pTagSchema = stcol, .numOfTags = schemaNCols(pSTable->tagSchema)};
SBinaryFilterSupp supp = {.fp = (__result_filter_fn_t)tSkipListNodeFilterCallback,
SBinaryFilterSupp supp = {
.fp = (__result_filter_fn_t)tSkipListNodeFilterCallback,
.setupInfoFn = (__do_filter_suppl_fn_t)filterPrepare,
.pExtInfo = &s};
.pExtInfo = &s
};
tSQLBinaryExprTraverse(pExpr, pSTable->pIndex, pRes, &supp);
tSQLBinaryExprDestroy(&pExpr, tSQLListTraverseDestroyInfo);
......
......@@ -46,7 +46,8 @@ int main(int argc, char *argv[]) {
}
printf("success to connect to server\n");
int32_t code = taos_query(taos, "select * from test.t1");
// int32_t code = taos_query(taos, "insert into test.tm2 values(now, 1)(now+1m,2)(now+2m,3) (now+3m, 4) (now+4m, 5);");
int32_t code = taos_query(taos, "insert into test.tm2 values(now, 99)");
if (code != 0) {
printf("failed to execute query, reason:%s\n", taos_errstr(taos));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册