Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9fab97a0
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9fab97a0
编写于
7月 02, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feat/tsdb_refact' of github.com:taosdata/TDengine into feat/tsdb_refact
上级
576777a6
279b59f8
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
183 addition
and
306 deletion
+183
-306
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+3
-33
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+60
-227
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
+51
-22
source/dnode/vnode/src/tsdb/tsdbUtil.c
source/dnode/vnode/src/tsdb/tsdbUtil.c
+69
-24
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
9fab97a0
...
@@ -35,7 +35,6 @@ extern "C" {
...
@@ -35,7 +35,6 @@ extern "C" {
typedef
struct
TSDBROW
TSDBROW
;
typedef
struct
TSDBROW
TSDBROW
;
typedef
struct
TABLEID
TABLEID
;
typedef
struct
TABLEID
TABLEID
;
typedef
struct
TSDBKEY
TSDBKEY
;
typedef
struct
TSDBKEY
TSDBKEY
;
typedef
struct
KEYINFO
KEYINFO
;
typedef
struct
SDelData
SDelData
;
typedef
struct
SDelData
SDelData
;
typedef
struct
SDelIdx
SDelIdx
;
typedef
struct
SDelIdx
SDelIdx
;
typedef
struct
STbData
STbData
;
typedef
struct
STbData
STbData
;
...
@@ -43,7 +42,6 @@ typedef struct SMemTable SMemTable;
...
@@ -43,7 +42,6 @@ typedef struct SMemTable SMemTable;
typedef
struct
STbDataIter
STbDataIter
;
typedef
struct
STbDataIter
STbDataIter
;
typedef
struct
STable
STable
;
typedef
struct
STable
STable
;
typedef
struct
SMapData
SMapData
;
typedef
struct
SMapData
SMapData
;
typedef
struct
SBlockSMA
SBlockSMA
;
typedef
struct
SBlockIdx
SBlockIdx
;
typedef
struct
SBlockIdx
SBlockIdx
;
typedef
struct
SBlock
SBlock
;
typedef
struct
SBlock
SBlock
;
typedef
struct
SBlockStatis
SBlockStatis
;
typedef
struct
SBlockStatis
SBlockStatis
;
...
@@ -106,14 +104,6 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
...
@@ -106,14 +104,6 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
int32_t
tsdbKeyCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
tsdbKeyCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
#define MIN_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) < 0) ? (KEY1) : (KEY2))
#define MIN_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) < 0) ? (KEY1) : (KEY2))
#define MAX_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) > 0) ? (KEY1) : (KEY2))
#define MAX_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) > 0) ? (KEY1) : (KEY2))
// KEYINFO
#define tKEYINFOInit() \
((KEYINFO){.maxKey = {.ts = TSKEY_MIN, .version = -1}, \
.minKey = {.ts = TSKEY_MAX, .version = INT64_MAX}, \
.minVerion = INT64_MAX, \
.maxVersion = -1})
int32_t
tPutKEYINFO
(
uint8_t
*
p
,
KEYINFO
*
pKeyInfo
);
int32_t
tGetKEYINFO
(
uint8_t
*
p
,
KEYINFO
*
pKeyInfo
);
// SBlockCol
// SBlockCol
int32_t
tPutBlockCol
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tPutBlockCol
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tGetBlockCol
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tGetBlockCol
(
uint8_t
*
p
,
void
*
ph
);
...
@@ -123,6 +113,7 @@ void tBlockReset(SBlock *pBlock);
...
@@ -123,6 +113,7 @@ void tBlockReset(SBlock *pBlock);
int32_t
tPutBlock
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tPutBlock
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tGetBlock
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tGetBlock
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tBlockCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
tBlockCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
bool
tBlockHasSma
(
SBlock
*
pBlock
);
// SBlockIdx
// SBlockIdx
// #define tBlockIdxInit(SUID, UID) ((SBlockIdx){.suid = (SUID), .uid = (UID), .info = tKEYINFOInit()})
// #define tBlockIdxInit(SUID, UID) ((SBlockIdx){.suid = (SUID), .uid = (UID), .info = tKEYINFOInit()})
void
tBlockIdxReset
(
SBlockIdx
*
pBlockIdx
);
void
tBlockIdxReset
(
SBlockIdx
*
pBlockIdx
);
...
@@ -173,6 +164,7 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData);
...
@@ -173,6 +164,7 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData);
int32_t
tsdbKeyFid
(
TSKEY
key
,
int32_t
minutes
,
int8_t
precision
);
int32_t
tsdbKeyFid
(
TSKEY
key
,
int32_t
minutes
,
int8_t
precision
);
void
tsdbFidKeyRange
(
int32_t
fid
,
int32_t
minutes
,
int8_t
precision
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
);
void
tsdbFidKeyRange
(
int32_t
fid
,
int32_t
minutes
,
int8_t
precision
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
);
int32_t
tsdbBuildDeleteSkyline
(
SArray
*
aDelData
,
int32_t
sidx
,
int32_t
eidx
,
SArray
*
aSkyline
);
int32_t
tsdbBuildDeleteSkyline
(
SArray
*
aDelData
,
int32_t
sidx
,
int32_t
eidx
,
SArray
*
aSkyline
);
void
tsdbCalcColDataSMA
(
SColData
*
pColData
,
SColumnDataAgg
*
pColAgg
);
// tsdbMemTable ==============================================================================================
// tsdbMemTable ==============================================================================================
// SMemTable
// SMemTable
int32_t
tsdbMemTableCreate
(
STsdb
*
pTsdb
,
SMemTable
**
ppMemTable
);
int32_t
tsdbMemTableCreate
(
STsdb
*
pTsdb
,
SMemTable
**
ppMemTable
);
...
@@ -230,7 +222,7 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
...
@@ -230,7 +222,7 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
SBlockData
*
pBlockData
,
uint8_t
**
ppBuf1
,
uint8_t
**
ppBuf2
);
SBlockData
*
pBlockData
,
uint8_t
**
ppBuf1
,
uint8_t
**
ppBuf2
);
int32_t
tsdbReadBlockData
(
SDataFReader
*
pReader
,
SBlockIdx
*
pBlockIdx
,
SBlock
*
pBlock
,
SBlockData
*
pBlockData
,
int32_t
tsdbReadBlockData
(
SDataFReader
*
pReader
,
SBlockIdx
*
pBlockIdx
,
SBlock
*
pBlock
,
SBlockData
*
pBlockData
,
uint8_t
**
ppBuf1
,
uint8_t
**
ppBuf2
);
uint8_t
**
ppBuf1
,
uint8_t
**
ppBuf2
);
int32_t
tsdbReadBlockS
MA
(
SDataFReader
*
pReader
,
SBlockSMA
*
pBlkSMA
);
int32_t
tsdbReadBlockS
ma
(
SDataFReader
*
pReader
,
SBlock
*
pBlock
,
SArray
*
aColumnDataAgg
,
uint8_t
**
ppBuf
);
// SDelFWriter
// SDelFWriter
int32_t
tsdbDelFWriterOpen
(
SDelFWriter
**
ppWriter
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
);
int32_t
tsdbDelFWriterOpen
(
SDelFWriter
**
ppWriter
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
);
int32_t
tsdbDelFWriterClose
(
SDelFWriter
*
pWriter
,
int8_t
sync
);
int32_t
tsdbDelFWriterClose
(
SDelFWriter
*
pWriter
,
int8_t
sync
);
...
@@ -287,13 +279,6 @@ struct TSDBKEY {
...
@@ -287,13 +279,6 @@ struct TSDBKEY {
TSKEY
ts
;
TSKEY
ts
;
};
};
struct
KEYINFO
{
TSDBKEY
minKey
;
TSDBKEY
maxKey
;
int64_t
minVerion
;
int64_t
maxVersion
;
};
typedef
struct
SMemSkipListNode
SMemSkipListNode
;
typedef
struct
SMemSkipListNode
SMemSkipListNode
;
struct
SMemSkipListNode
{
struct
SMemSkipListNode
{
int8_t
level
;
int8_t
level
;
...
@@ -477,21 +462,6 @@ struct SDelFile {
...
@@ -477,21 +462,6 @@ struct SDelFile {
int64_t
offset
;
int64_t
offset
;
};
};
typedef
struct
{
int16_t
colId
;
int16_t
maxIndex
;
int16_t
minIndex
;
int16_t
numOfNull
;
int64_t
sum
;
int64_t
max
;
int64_t
min
;
}
SColSMA
;
struct
SBlockSMA
{
int32_t
nCol
;
SColSMA
*
aColSMA
;
};
#pragma pack(push, 1)
#pragma pack(push, 1)
struct
SBlockDataHdr
{
struct
SBlockDataHdr
{
uint32_t
delimiter
;
uint32_t
delimiter
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
9fab97a0
...
@@ -147,12 +147,11 @@ struct STsdbReader {
...
@@ -147,12 +147,11 @@ struct STsdbReader {
static
SFileDataBlockInfo
*
getCurrentBlockInfo
(
SDataBlockIter
*
pBlockIter
);
static
SFileDataBlockInfo
*
getCurrentBlockInfo
(
SDataBlockIter
*
pBlockIter
);
static
int
buildDataBlockFromBufImpl
(
STableBlockScanInfo
*
pBlockScanInfo
,
TSDBKEY
maxKey
,
int32_t
capacity
,
STsdbReader
*
pReader
);
static
int
buildDataBlockFromBufImpl
(
STableBlockScanInfo
*
pBlockScanInfo
,
TSDBKEY
maxKey
,
int32_t
capacity
,
STsdbReader
*
pReader
);
static
TSDBROW
*
getValidRow
(
STbDataIter
*
pIter
,
bool
*
hasVal
,
STsdbReader
*
pReader
);
static
TSDBROW
*
getValidRow
(
STbDataIter
*
pIter
,
bool
*
hasVal
,
STsdbReader
*
pReader
);
static
int32_t
doLoadRowsOfIdenticalTsInFileBlock
(
SFileDataBlockInfo
*
pFBlock
,
SBlock
*
pBlock
,
SBlockData
*
pBlockData
,
static
int32_t
doLoadRowsOfIdenticalTsInFileBlock
(
SBlockData
*
pBlockData
,
STableBlockScanInfo
*
pScanInfo
,
STsdbReader
*
pReader
,
SRowMerger
*
pMerger
);
STableBlockScanInfo
*
pScanInfo
,
STsdbReader
*
pReader
,
SRowMerger
*
pMerger
);
static
int32_t
doLoadRowsOfIdenticalTsInBuf
(
STbDataIter
*
pIter
,
bool
*
hasVal
,
int64_t
ts
,
SRowMerger
*
pMerger
,
STsdbReader
*
pReader
);
static
int32_t
doLoadRowsOfIdenticalTsInBuf
(
STbDataIter
*
pIter
,
bool
*
hasVal
,
int64_t
ts
,
SRowMerger
*
pMerger
,
STsdbReader
*
pReader
);
static
int32_t
doAppendOneRow
(
SSDataBlock
*
pBlock
,
STsdbReader
*
pReader
,
STSRow
*
pTSRow
);
static
int32_t
doAppendOneRow
(
SSDataBlock
*
pBlock
,
STsdbReader
*
pReader
,
STSRow
*
pTSRow
);
static
void
setComposedBlockFlag
(
STsdbReader
*
pReader
,
bool
composed
);
static
void
setComposedBlockFlag
(
STsdbReader
*
pReader
,
bool
composed
);
static
void
checkU
pdateSchema
(
TSDBROW
*
pRow
,
uint64_t
uid
,
STsdbReader
*
pReader
);
static
void
u
pdateSchema
(
TSDBROW
*
pRow
,
uint64_t
uid
,
STsdbReader
*
pReader
);
static
void
doMergeMultiRows
(
TSDBROW
*
pRow
,
uint64_t
uid
,
STbDataIter
*
dIter
,
bool
*
hasVal
,
STSRow
**
pTSRow
,
STsdbReader
*
pReader
);
static
void
doMergeMultiRows
(
TSDBROW
*
pRow
,
uint64_t
uid
,
STbDataIter
*
dIter
,
bool
*
hasVal
,
STSRow
**
pTSRow
,
STsdbReader
*
pReader
);
static
void
doMergeMemIMemRows
(
TSDBROW
*
pRow
,
TSDBROW
*
piRow
,
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
,
STSRow
**
pTSRow
);
static
void
doMergeMemIMemRows
(
TSDBROW
*
pRow
,
TSDBROW
*
piRow
,
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
,
STSRow
**
pTSRow
);
...
@@ -1069,64 +1068,6 @@ _error:
...
@@ -1069,64 +1068,6 @@ _error:
// return code;
// return code;
// }
// }
// static int32_t loadFileDataBlock(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableBlockScanInfo* pCheckInfo,
// bool* exists) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// int32_t code = TSDB_CODE_SUCCESS;
// bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
// if (asc) {
// // query ended in/started from current block
// if (pTsdbReadHandle->window.ekey < pBlock->maxKey.ts || pCheckInfo->lastKey > pBlock->minKey.ts) {
// if ((code = doLoadFileBlockData(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
// *exists = false;
// return code;
// }
// SDataCols* pTSCol = pTsdbReadHandle->rhelper.pDCols[0];
// assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
// if (pCheckInfo->lastKey > pBlock->minKey.ts) {
// cur->pos =
// binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey,
// pTsdbReadHandle->order);
// } else {
// cur->pos = 0;
// }
// assert(pCheckInfo->lastKey <= pBlock->maxKey.ts);
// doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
// } else { // the whole block is loaded in to buffer
// cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
// code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
// }
// } else { // desc order, query ended in current block
// if (pTsdbReadHandle->window.ekey > pBlock->minKey.ts || pCheckInfo->lastKey < pBlock->maxKey.ts) {
// if ((code = doLoadFileBlockData(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
// *exists = false;
// return code;
// }
// SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
// if (pCheckInfo->lastKey < pBlock->maxKey.ts) {
// cur->pos =
// binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey,
// pTsdbReadHandle->order);
// } else {
// cur->pos = pBlock->numOfRows - 1;
// }
// assert(pCheckInfo->lastKey >= pBlock->minKey.ts);
// doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
// } else {
// cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
// code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
// }
// }
// *exists = pTsdbReadHandle->realNumOfRows > 0;
// return code;
// }
// static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
// static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
// int firstPos, lastPos, midPos = -1;
// int firstPos, lastPos, midPos = -1;
...
@@ -1189,94 +1130,6 @@ _error:
...
@@ -1189,94 +1130,6 @@ _error:
// return midPos;
// return midPos;
// }
// }
// static int32_t doCopyRowsFromFileBlock(STsdbReader* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t
// start,
// int32_t end) {
// SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
// TSKEY* tsArray = pCols->cols[0].pData;
// int32_t num = end - start + 1;
// assert(num >= 0);
// if (num == 0) {
// return numOfRows;
// }
// bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
// int32_t trueStart = ascScan ? start : end;
// int32_t trueEnd = ascScan ? end : start;
// int32_t step = ascScan ? 1 : -1;
// int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
// // data in buffer has greater timestamp, copy data in file block
// int32_t i = 0, j = 0;
// while (i < requiredNumOfCols && j < pCols->numOfCols) {
// SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
// SDataCol* src = &pCols->cols[j];
// if (src->colId < pColInfo->info.colId) {
// j++;
// continue;
// }
// if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
// if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) { // todo opt performance
// // memmove(pData, (char*)src->pData + bytes * start, bytes * num);
// int32_t rowIndex = numOfRows;
// for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex)
// {
// SCellVal sVal = {0};
// if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
// TASSERT(0);
// }
// if (sVal.valType == TD_VTYPE_NORM) {
// colDataAppend(pColInfo, rowIndex, sVal.val, false);
// } else {
// colDataAppendNULL(pColInfo, rowIndex);
// }
// }
// } else { // handle the var-string
// int32_t rowIndex = numOfRows;
// // todo refactor, only copy one-by-one
// for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex)
// {
// SCellVal sVal = {0};
// if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
// TASSERT(0);
// }
// if (sVal.valType == TD_VTYPE_NORM) {
// colDataAppend(pColInfo, rowIndex, sVal.val, false);
// } else {
// colDataAppendNULL(pColInfo, rowIndex);
// }
// }
// }
// j++;
// i++;
// } else { // pColInfo->info.colId < src->colId, it is a NULL data
// colDataAppendNNULL(pColInfo, numOfRows, num);
// i++;
// }
// }
// while (i < requiredNumOfCols) { // the remain columns are all null data
// SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
// colDataAppendNNULL(pColInfo, numOfRows, num);
// i++;
// }
// pTsdbReadHandle->cur.win.ekey = tsArray[trueEnd];
// pTsdbReadHandle->cur.lastKey = tsArray[trueEnd] + step;
// return numOfRows + num;
// }
// static int32_t mergeTwoRowFromMem(STsdbReader* pTsdbReadHandle, int32_t capacity, int32_t* curRow, STSRow* row1,
// static int32_t mergeTwoRowFromMem(STsdbReader* pTsdbReadHandle, int32_t capacity, int32_t* curRow, STSRow* row1,
// STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema*
// STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema*
// pSchema2, bool update, TSKEY* lastRowKey) {
// pSchema2, bool update, TSKEY* lastRowKey) {
...
@@ -1472,42 +1325,6 @@ _error:
...
@@ -1472,42 +1325,6 @@ _error:
// #endif
// #endif
// }
// }
// static void getQualifiedRowsPos(STsdbReader* pTsdbReadHandle, int32_t startPos, int32_t endPos, int32_t numOfExisted,
// int32_t* start, int32_t* end) {
// *start = -1;
// if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
// int32_t remain = endPos - startPos + 1;
// if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
// *end = (pTsdbReadHandle->outputCapacity - numOfExisted) + startPos - 1;
// } else {
// *end = endPos;
// }
// *start = startPos;
// } else {
// int32_t remain = (startPos - endPos) + 1;
// if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
// *end = startPos + 1 - (pTsdbReadHandle->outputCapacity - numOfExisted);
// } else {
// *end = endPos;
// }
// *start = *end;
// *end = startPos;
// }
// }
// static void updateInfoAfterMerge(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo, int32_t numOfRows,
// int32_t endPos) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// pCheckInfo->lastKey = cur->lastKey;
// pTsdbReadHandle->realNumOfRows = numOfRows;
// cur->rows = numOfRows;
// cur->pos = endPos;
// }
// static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle) {
// static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
...
@@ -2198,18 +2015,22 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock
...
@@ -2198,18 +2015,22 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock
return
-
1
;
return
-
1
;
}
}
static
int32_t
setFileBlockActiveInBlockIter
(
SDataBlockIter
*
pBlockIter
,
int32_t
index
)
{
static
int32_t
setFileBlockActiveInBlockIter
(
SDataBlockIter
*
pBlockIter
,
int32_t
index
,
int32_t
step
)
{
if
(
index
<
0
||
index
>=
pBlockIter
->
numOfBlocks
)
{
if
(
index
<
0
||
index
>=
pBlockIter
->
numOfBlocks
)
{
return
-
1
;
return
-
1
;
}
}
SFileDataBlockInfo
fblock
=
*
(
SFileDataBlockInfo
*
)
taosArrayGet
(
pBlockIter
->
blockList
,
index
);
SFileDataBlockInfo
fblock
=
*
(
SFileDataBlockInfo
*
)
taosArrayGet
(
pBlockIter
->
blockList
,
index
);
pBlockIter
->
index
+=
step
;
if
(
index
!=
pBlockIter
->
index
)
{
taosArrayRemove
(
pBlockIter
->
blockList
,
index
);
taosArrayRemove
(
pBlockIter
->
blockList
,
index
);
taosArrayInsert
(
pBlockIter
->
blockList
,
pBlockIter
->
index
,
&
fblock
);
taosArrayInsert
(
pBlockIter
->
blockList
,
pBlockIter
->
index
,
&
fblock
);
SFileDataBlockInfo
*
pBlockInfo
=
taosArrayGet
(
pBlockIter
->
blockList
,
pBlockIter
->
index
);
SFileDataBlockInfo
*
pBlockInfo
=
taosArrayGet
(
pBlockIter
->
blockList
,
pBlockIter
->
index
);
ASSERT
(
pBlockInfo
->
uid
==
fblock
.
uid
&&
pBlockInfo
->
tbBlockIdx
==
fblock
.
tbBlockIdx
);
ASSERT
(
pBlockInfo
->
uid
==
fblock
.
uid
&&
pBlockInfo
->
tbBlockIdx
==
fblock
.
tbBlockIdx
);
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -2242,7 +2063,14 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBloc
...
@@ -2242,7 +2063,14 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBloc
overlapWithNeighbor
=
overlapWithNeighborBlock
(
pBlock
,
pNeighbor
,
pReader
->
order
);
overlapWithNeighbor
=
overlapWithNeighborBlock
(
pBlock
,
pNeighbor
,
pReader
->
order
);
}
}
return
(
overlapWithNeighbor
||
bool
hasDup
=
false
;
if
(
pBlock
->
nSubBlock
==
1
)
{
hasDup
=
pBlock
->
hasDup
;
}
else
{
hasDup
=
true
;
}
return
(
overlapWithNeighbor
||
hasDup
||
dataBlockPartialRequired
(
&
pReader
->
window
,
&
pReader
->
verRange
,
pBlock
)
||
dataBlockPartialRequired
(
&
pReader
->
window
,
&
pReader
->
verRange
,
pBlock
)
||
keyOverlapFileBlock
(
key
,
pBlock
,
&
pReader
->
verRange
)
||
keyOverlapFileBlock
(
key
,
pBlock
,
&
pReader
->
verRange
)
||
(
pBlock
->
nRow
>
pReader
->
capacity
));
(
pBlock
->
nRow
>
pReader
->
capacity
));
...
@@ -2280,7 +2108,7 @@ static int32_t doMergeBufFileRows(STsdbReader* pReader, STableBlockScanInfo* pBl
...
@@ -2280,7 +2108,7 @@ static int32_t doMergeBufFileRows(STsdbReader* pReader, STableBlockScanInfo* pBl
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
doLoadRowsOfIdenticalTsInFileBlock
(
p
FBlock
,
pBlock
,
p
BlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
doLoadRowsOfIdenticalTsInFileBlock
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
if
(
k
.
ts
==
key
)
{
if
(
k
.
ts
==
key
)
{
tRowMerge
(
&
merge
,
pRow
);
tRowMerge
(
&
merge
,
pRow
);
...
@@ -2296,10 +2124,13 @@ static int32_t doMergeBufFileRows(STsdbReader* pReader, STableBlockScanInfo* pBl
...
@@ -2296,10 +2124,13 @@ static int32_t doMergeBufFileRows(STsdbReader* pReader, STableBlockScanInfo* pBl
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
buildComposedDataBlockImpl
(
STsdbReader
*
pReader
,
S
FileDataBlockInfo
*
pFBlock
,
SBlock
*
pBlock
,
S
TableBlockScanInfo
*
pBlockScanInfo
)
{
static
int32_t
buildComposedDataBlockImpl
(
STsdbReader
*
pReader
,
STableBlockScanInfo
*
pBlockScanInfo
)
{
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SBlockData
*
pBlockData
=
&
pReader
->
status
.
fileBlockData
;
SBlockData
*
pBlockData
=
&
pReader
->
status
.
fileBlockData
;
SFileDataBlockInfo
*
pFBlock
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
SBlock
*
pBlock
=
taosArrayGet
(
pBlockScanInfo
->
pBlockList
,
pFBlock
->
tbBlockIdx
);
SRowMerger
merge
=
{
0
};
SRowMerger
merge
=
{
0
};
STSRow
*
pTSRow
=
NULL
;
STSRow
*
pTSRow
=
NULL
;
...
@@ -2316,7 +2147,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
...
@@ -2316,7 +2147,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
doLoadRowsOfIdenticalTsInFileBlock
(
p
FBlock
,
pBlock
,
p
BlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
doLoadRowsOfIdenticalTsInFileBlock
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
if
(
ik
.
ts
==
key
)
{
if
(
ik
.
ts
==
key
)
{
tRowMerge
(
&
merge
,
piRow
);
tRowMerge
(
&
merge
,
piRow
);
...
@@ -2370,7 +2201,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
...
@@ -2370,7 +2201,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
// imem & mem are all empty
// imem & mem are all empty
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
doLoadRowsOfIdenticalTsInFileBlock
(
p
FBlock
,
pBlock
,
p
BlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
doLoadRowsOfIdenticalTsInFileBlock
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
doAppendOneRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
);
doAppendOneRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
);
}
}
...
@@ -2378,16 +2209,17 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
...
@@ -2378,16 +2209,17 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
buildComposedDataBlock
(
STsdbReader
*
pReader
,
SFileDataBlockInfo
*
pFBlock
,
SBlock
*
pBlock
,
static
int32_t
buildComposedDataBlock
(
STsdbReader
*
pReader
,
STableBlockScanInfo
*
pBlockScanInfo
)
{
STableBlockScanInfo
*
pBlockScanInfo
)
{
SSDataBlock
*
pResBlock
=
pReader
->
pResBlock
;
SSDataBlock
*
pResBlock
=
pReader
->
pResBlock
;
while
(
1
)
{
while
(
1
)
{
buildComposedDataBlockImpl
(
pReader
,
p
FBlock
,
pBlock
,
p
BlockScanInfo
);
buildComposedDataBlockImpl
(
pReader
,
pBlockScanInfo
);
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SFileDataBlockInfo
*
pBlockInfo
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
SFileDataBlockInfo
*
pFBlock
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
if
(
pBlockInfo
->
tbBlockIdx
==
pFBlock
->
tbBlockIdx
)
{
// still in the same file block now
SBlock
*
pBlock
=
taosArrayGet
(
pBlockScanInfo
->
pBlockList
,
pFBlock
->
tbBlockIdx
);
// currently loaded file data block is consumed
if
(
pDumpInfo
->
rowIndex
>=
pBlock
->
nRow
||
pDumpInfo
->
rowIndex
<
0
)
{
if
(
pDumpInfo
->
rowIndex
>=
pBlock
->
nRow
||
pDumpInfo
->
rowIndex
<
0
)
{
setBlockAllDumped
(
pDumpInfo
,
pBlock
,
pReader
->
order
);
setBlockAllDumped
(
pDumpInfo
,
pBlock
,
pReader
->
order
);
break
;
break
;
...
@@ -2396,16 +2228,16 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, SFileDataBlockInfo*
...
@@ -2396,16 +2228,16 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, SFileDataBlockInfo*
if
(
pResBlock
->
info
.
rows
>=
pReader
->
capacity
)
{
if
(
pResBlock
->
info
.
rows
>=
pReader
->
capacity
)
{
break
;
break
;
}
}
}
else
{
// todo traverse to next file due to time window overlap
if
(
pResBlock
->
info
.
rows
>=
pReader
->
capacity
)
{
ASSERT
(
0
);
return
TSDB_CODE_SUCCESS
;
}
}
}
}
pResBlock
->
info
.
uid
=
pBlockScanInfo
->
uid
;
pResBlock
->
info
.
uid
=
pBlockScanInfo
->
uid
;
blockDataUpdateTsWindow
(
pResBlock
,
0
);
setComposedBlockFlag
(
pReader
,
true
);
setComposedBlockFlag
(
pReader
,
true
);
tsdbDebug
(
"%p uid:%"
PRIu64
", composed data block created, brange:%"
PRIu64
"-%"
PRIu64
" rows:%d, %s"
,
pReader
,
pBlockScanInfo
->
uid
,
pResBlock
->
info
.
window
.
skey
,
pResBlock
->
info
.
window
.
ekey
,
pResBlock
->
info
.
rows
,
pReader
->
idStr
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -2549,7 +2381,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
...
@@ -2549,7 +2381,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
}
}
// build composed data block
// build composed data block
code
=
buildComposedDataBlock
(
pReader
,
p
FBlock
,
pBlock
,
p
ScanInfo
);
code
=
buildComposedDataBlock
(
pReader
,
pScanInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
return
code
;
}
}
...
@@ -2653,7 +2485,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
...
@@ -2653,7 +2485,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
}
else
{
}
else
{
SFileDataBlockInfo
*
pFBlock
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
SFileDataBlockInfo
*
pFBlock
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
STableBlockScanInfo
*
pScanInfo
=
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pFBlock
->
uid
,
sizeof
(
pFBlock
->
uid
));
STableBlockScanInfo
*
pScanInfo
=
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pFBlock
->
uid
,
sizeof
(
pFBlock
->
uid
));
SBlock
*
pBlock
=
taosArrayGet
(
pScanInfo
->
pBlockList
,
pFBlock
->
tbBlockIdx
);
// current block are exhausted, try the next file block
// current block are exhausted, try the next file block
if
(
pReader
->
status
.
fBlockDumpInfo
.
allDumped
)
{
if
(
pReader
->
status
.
fBlockDumpInfo
.
allDumped
)
{
...
@@ -2673,7 +2504,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
...
@@ -2673,7 +2504,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
return
code
;
return
code
;
}
}
}
else
{
}
else
{
code
=
buildComposedDataBlock
(
pReader
,
p
FBlock
,
pBlock
,
p
ScanInfo
);
code
=
buildComposedDataBlock
(
pReader
,
pScanInfo
);
return
code
;
return
code
;
}
}
}
}
...
@@ -2815,10 +2646,10 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
...
@@ -2815,10 +2646,10 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
//1. find the next neighbor block in the scan block list
//1. find the next neighbor block in the scan block list
SFileDataBlockInfo
fb
=
{.
uid
=
pFBlock
->
uid
,
.
tbBlockIdx
=
nextIndex
};
SFileDataBlockInfo
fb
=
{.
uid
=
pFBlock
->
uid
,
.
tbBlockIdx
=
nextIndex
};
int32_t
neighborIndex
=
findFileBlockInfoIndex
(
&
pStatus
->
b
lockIter
,
&
fb
);
int32_t
neighborIndex
=
findFileBlockInfoIndex
(
pB
lockIter
,
&
fb
);
//2. remove it from the scan block list
//2. remove it from the scan block list
setFileBlockActiveInBlockIter
(
&
pStatus
->
blockIter
,
neighborIndex
);
setFileBlockActiveInBlockIter
(
pBlockIter
,
neighborIndex
,
step
);
//3. load the neighbor block, and set it to be the currently accessed file data block
//3. load the neighbor block, and set it to be the currently accessed file data block
int32_t
code
=
doLoadFileBlockData
(
pReader
,
pBlockIter
,
pScanInfo
,
&
pStatus
->
fileBlockData
);
int32_t
code
=
doLoadFileBlockData
(
pReader
,
pBlockIter
,
pScanInfo
,
&
pStatus
->
fileBlockData
);
...
@@ -2840,17 +2671,16 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
...
@@ -2840,17 +2671,16 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
doLoadRowsOfIdenticalTsInFileBlock
(
SFileDataBlockInfo
*
pFBlock
,
SBlock
*
pBlock
,
SBlockData
*
pBlockData
,
int32_t
doLoadRowsOfIdenticalTsInFileBlock
(
SBlockData
*
pBlockData
,
STableBlockScanInfo
*
pScanInfo
,
STsdbReader
*
pReader
,
SRowMerger
*
pMerger
)
{
STableBlockScanInfo
*
pScanInfo
,
STsdbReader
*
pReader
,
SRowMerger
*
pMerger
)
{
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
bool
asc
=
ASCENDING_TRAVERSE
(
pReader
->
order
);
bool
asc
=
ASCENDING_TRAVERSE
(
pReader
->
order
);
int32_t
step
=
asc
?
1
:
-
1
;
int64_t
key
=
pBlockData
->
aTSKEY
[
pDumpInfo
->
rowIndex
];
int64_t
key
=
pBlockData
->
aTSKEY
[
pDumpInfo
->
rowIndex
];
int32_t
step
=
asc
?
1
:
-
1
;
if
(
asc
)
{
if
(
asc
)
{
pDumpInfo
->
rowIndex
+=
step
;
pDumpInfo
->
rowIndex
+=
step
;
if
(
pDumpInfo
->
rowIndex
<
pBlockData
->
nRow
-
1
)
{
if
(
pDumpInfo
->
rowIndex
<
=
pBlockData
->
nRow
-
1
)
{
pDumpInfo
->
rowIndex
=
doMergeRowsInFileBlockImpl
(
pBlockData
,
pDumpInfo
->
rowIndex
,
key
,
pMerger
,
&
pReader
->
verRange
,
step
);
pDumpInfo
->
rowIndex
=
doMergeRowsInFileBlockImpl
(
pBlockData
,
pDumpInfo
->
rowIndex
,
key
,
pMerger
,
&
pReader
->
verRange
,
step
);
}
}
...
@@ -2858,7 +2688,10 @@ int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock*
...
@@ -2858,7 +2688,10 @@ int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock*
if
(
pDumpInfo
->
rowIndex
>=
pBlockData
->
nRow
&&
asc
)
{
if
(
pDumpInfo
->
rowIndex
>=
pBlockData
->
nRow
&&
asc
)
{
while
(
1
)
{
while
(
1
)
{
CHECK_FILEBLOCK_STATE
st
;
CHECK_FILEBLOCK_STATE
st
;
checkForNeighborFileBlock
(
pReader
,
pScanInfo
,
pBlock
,
pFBlock
,
pMerger
,
key
,
&
st
);
SFileDataBlockInfo
*
pFileBlockInfo
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
SBlock
*
pCurrentBlock
=
taosArrayGet
(
pScanInfo
->
pBlockList
,
pFileBlockInfo
->
tbBlockIdx
);
checkForNeighborFileBlock
(
pReader
,
pScanInfo
,
pCurrentBlock
,
pFileBlockInfo
,
pMerger
,
key
,
&
st
);
if
(
st
==
CHECK_FILEBLOCK_QUIT
)
{
if
(
st
==
CHECK_FILEBLOCK_QUIT
)
{
break
;
break
;
}
}
...
@@ -2876,7 +2709,7 @@ int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock*
...
@@ -2876,7 +2709,7 @@ int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock*
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
void
checkU
pdateSchema
(
TSDBROW
*
pRow
,
uint64_t
uid
,
STsdbReader
*
pReader
)
{
void
u
pdateSchema
(
TSDBROW
*
pRow
,
uint64_t
uid
,
STsdbReader
*
pReader
)
{
int32_t
sversion
=
TSDBROW_SVERSION
(
pRow
);
int32_t
sversion
=
TSDBROW_SVERSION
(
pRow
);
if
(
pReader
->
pSchema
==
NULL
)
{
if
(
pReader
->
pSchema
==
NULL
)
{
...
@@ -2891,7 +2724,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* has
...
@@ -2891,7 +2724,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* has
SRowMerger
merge
=
{
0
};
SRowMerger
merge
=
{
0
};
TSDBKEY
k
=
TSDBROW_KEY
(
pRow
);
TSDBKEY
k
=
TSDBROW_KEY
(
pRow
);
checkU
pdateSchema
(
pRow
,
uid
,
pReader
);
u
pdateSchema
(
pRow
,
uid
,
pReader
);
tRowMergerInit
(
&
merge
,
pRow
,
pReader
->
pSchema
);
tRowMergerInit
(
&
merge
,
pRow
,
pReader
->
pSchema
);
doLoadRowsOfIdenticalTsInBuf
(
dIter
,
hasVal
,
k
.
ts
,
&
merge
,
pReader
);
doLoadRowsOfIdenticalTsInBuf
(
dIter
,
hasVal
,
k
.
ts
,
&
merge
,
pReader
);
...
@@ -2905,7 +2738,7 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlo
...
@@ -2905,7 +2738,7 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlo
TSDBKEY
ik
=
TSDBROW_KEY
(
piRow
);
TSDBKEY
ik
=
TSDBROW_KEY
(
piRow
);
ASSERT
(
k
.
ts
==
ik
.
ts
);
ASSERT
(
k
.
ts
==
ik
.
ts
);
checkU
pdateSchema
(
piRow
,
pBlockScanInfo
->
uid
,
pReader
);
u
pdateSchema
(
piRow
,
pBlockScanInfo
->
uid
,
pReader
);
tRowMergerInit
(
&
merge
,
piRow
,
pReader
->
pSchema
);
tRowMergerInit
(
&
merge
,
piRow
,
pReader
->
pSchema
);
doLoadRowsOfIdenticalTsInBuf
(
pBlockScanInfo
->
iiter
,
&
pBlockScanInfo
->
imemHasVal
,
ik
.
ts
,
&
merge
,
pReader
);
doLoadRowsOfIdenticalTsInBuf
(
pBlockScanInfo
->
iiter
,
&
pBlockScanInfo
->
imemHasVal
,
ik
.
ts
,
&
merge
,
pReader
);
...
...
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
浏览文件 @
9fab97a0
...
@@ -1139,9 +1139,55 @@ _err:
...
@@ -1139,9 +1139,55 @@ _err:
return
code
;
return
code
;
}
}
int32_t
tsdbReadBlockS
MA
(
SDataFReader
*
pReader
,
SBlockSMA
*
pBlkSMA
)
{
int32_t
tsdbReadBlockS
ma
(
SDataFReader
*
pReader
,
SBlock
*
pBlock
,
SArray
*
aColumnDataAgg
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
// TODO
TdFilePtr
pFD
=
pReader
->
pSmaFD
;
int64_t
offset
=
pBlock
->
aSubBlock
[
0
].
offset
;
int64_t
size
=
pBlock
->
aSubBlock
[
0
].
nSma
*
sizeof
(
SColumnDataAgg
)
+
sizeof
(
TSCKSUM
);
uint8_t
*
pBuf
=
NULL
;
int64_t
n
;
ASSERT
(
tBlockHasSma
(
pBlock
));
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
code
=
tsdbRealloc
(
ppBuf
,
size
);
if
(
code
)
goto
_err
;
// lseek
n
=
taosLSeekFile
(
pFD
,
offset
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// read
n
=
taosReadFile
(
pFD
,
*
ppBuf
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// check
if
(
!
taosCheckChecksumWhole
(
NULL
,
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// decode
taosArrayClear
(
aColumnDataAgg
);
for
(
int32_t
iSma
=
0
;
iSma
<
pBlock
->
aSubBlock
[
0
].
nSma
;
iSma
++
)
{
if
(
taosArrayPush
(
aColumnDataAgg
,
&
((
SColumnDataAgg
*
)(
*
ppBuf
))[
iSma
])
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
}
tsdbFree
(
pBuf
);
return
code
;
_err:
tsdbError
(
"vgId:%d read block sma failed since %s"
,
TD_VID
(
pReader
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbFree
(
pBuf
);
return
code
;
return
code
;
}
}
...
@@ -1699,24 +1745,7 @@ _err:
...
@@ -1699,24 +1745,7 @@ _err:
return
code
;
return
code
;
}
}
static
void
tsdbCalcColDataSMA
(
SColData
*
pColData
,
SColumnDataAgg
*
pColAgg
)
{
static
int32_t
tsdbWriteBlockSma
(
TdFilePtr
pFD
,
SBlockData
*
pBlockData
,
SSubBlock
*
pSubBlock
,
uint8_t
**
ppBuf
)
{
SColVal
colVal
;
SColVal
*
pColVal
=
&
colVal
;
*
pColAgg
=
(
SColumnDataAgg
){.
colId
=
pColData
->
cid
};
for
(
int32_t
iVal
=
0
;
iVal
<
pColData
->
nVal
;
iVal
++
)
{
tColDataGetValue
(
pColData
,
iVal
,
pColVal
);
if
(
pColVal
->
isNone
||
pColVal
->
isNull
)
{
pColAgg
->
numOfNull
++
;
}
else
{
// TODO:
ASSERT
(
0
);
}
}
}
static
int32_t
tsdbWriteBlockSMA
(
TdFilePtr
pFD
,
SBlockData
*
pBlockData
,
SSubBlock
*
pSubBlock
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int64_t
n
;
int64_t
n
;
SColData
*
pColData
;
SColData
*
pColData
;
...
@@ -1843,7 +1872,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
...
@@ -1843,7 +1872,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
if
(
pBlock
->
nSubBlock
>
1
||
pBlock
->
last
||
pBlock
->
hasDup
)
goto
_exit
;
if
(
pBlock
->
nSubBlock
>
1
||
pBlock
->
last
||
pBlock
->
hasDup
)
goto
_exit
;
code
=
tsdbWriteBlockS
MA
(
pWriter
->
pSmaFD
,
pBlockData
,
pSubBlock
,
ppBuf1
);
code
=
tsdbWriteBlockS
ma
(
pWriter
->
pSmaFD
,
pBlockData
,
pSubBlock
,
ppBuf1
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
if
(
pSubBlock
->
nSma
>
0
)
{
if
(
pSubBlock
->
nSma
>
0
)
{
...
...
source/dnode/vnode/src/tsdb/tsdbUtil.c
浏览文件 @
9fab97a0
...
@@ -353,6 +353,14 @@ int32_t tBlockCmprFn(const void *p1, const void *p2) {
...
@@ -353,6 +353,14 @@ int32_t tBlockCmprFn(const void *p1, const void *p2) {
return
0
;
return
0
;
}
}
bool
tBlockHasSma
(
SBlock
*
pBlock
)
{
if
(
pBlock
->
nSubBlock
>
1
)
return
false
;
if
(
pBlock
->
last
)
return
false
;
if
(
pBlock
->
hasDup
)
return
false
;
return
pBlock
->
aSubBlock
[
0
].
nSma
>
0
;
}
// SBlockCol ======================================================
// SBlockCol ======================================================
int32_t
tPutBlockCol
(
uint8_t
*
p
,
void
*
ph
)
{
int32_t
tPutBlockCol
(
uint8_t
*
p
,
void
*
ph
)
{
int32_t
n
=
0
;
int32_t
n
=
0
;
...
@@ -769,29 +777,6 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
...
@@ -769,29 +777,6 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
return
code
;
return
code
;
}
}
// KEYINFO ======================================================
int32_t
tPutKEYINFO
(
uint8_t
*
p
,
KEYINFO
*
pKeyInfo
)
{
int32_t
n
=
0
;
n
+=
tPutTSDBKEY
(
p
?
p
+
n
:
p
,
&
pKeyInfo
->
minKey
);
n
+=
tPutTSDBKEY
(
p
?
p
+
n
:
p
,
&
pKeyInfo
->
maxKey
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pKeyInfo
->
minVerion
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pKeyInfo
->
maxVersion
);
return
n
;
}
int32_t
tGetKEYINFO
(
uint8_t
*
p
,
KEYINFO
*
pKeyInfo
)
{
int32_t
n
=
0
;
n
+=
tGetTSDBKEY
(
p
+
n
,
&
pKeyInfo
->
minKey
);
n
+=
tGetTSDBKEY
(
p
+
n
,
&
pKeyInfo
->
maxKey
);
n
+=
tGetI64v
(
p
+
n
,
&
pKeyInfo
->
minVerion
);
n
+=
tGetI64v
(
p
+
n
,
&
pKeyInfo
->
maxVersion
);
return
n
;
}
// SColData ========================================
// SColData ========================================
void
tColDataReset
(
SColData
*
pColData
,
int16_t
cid
,
int8_t
type
,
int8_t
smaOn
)
{
void
tColDataReset
(
SColData
*
pColData
,
int16_t
cid
,
int8_t
type
,
int8_t
smaOn
)
{
pColData
->
cid
=
cid
;
pColData
->
cid
=
cid
;
...
@@ -1192,3 +1177,63 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD
...
@@ -1192,3 +1177,63 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD
*
ppColData
=
*
(
SColData
**
)
p
;
*
ppColData
=
*
(
SColData
**
)
p
;
}
}
}
}
// ALGORITHM ==============================
void
tsdbCalcColDataSMA
(
SColData
*
pColData
,
SColumnDataAgg
*
pColAgg
)
{
SColVal
colVal
;
SColVal
*
pColVal
=
&
colVal
;
*
pColAgg
=
(
SColumnDataAgg
){.
colId
=
pColData
->
cid
};
for
(
int32_t
iVal
=
0
;
iVal
<
pColData
->
nVal
;
iVal
++
)
{
tColDataGetValue
(
pColData
,
iVal
,
pColVal
);
if
(
pColVal
->
isNone
||
pColVal
->
isNull
)
{
pColAgg
->
numOfNull
++
;
}
else
{
switch
(
pColData
->
type
)
{
case
TSDB_DATA_TYPE_NULL
:
break
;
case
TSDB_DATA_TYPE_BOOL
:
break
;
case
TSDB_DATA_TYPE_TINYINT
:
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
break
;
case
TSDB_DATA_TYPE_INT
:
break
;
case
TSDB_DATA_TYPE_BIGINT
:
break
;
case
TSDB_DATA_TYPE_FLOAT
:
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
break
;
case
TSDB_DATA_TYPE_VARCHAR
:
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
break
;
case
TSDB_DATA_TYPE_NCHAR
:
break
;
case
TSDB_DATA_TYPE_UTINYINT
:
break
;
case
TSDB_DATA_TYPE_USMALLINT
:
break
;
case
TSDB_DATA_TYPE_UINT
:
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
break
;
case
TSDB_DATA_TYPE_JSON
:
break
;
case
TSDB_DATA_TYPE_VARBINARY
:
break
;
case
TSDB_DATA_TYPE_DECIMAL
:
break
;
case
TSDB_DATA_TYPE_BLOB
:
break
;
case
TSDB_DATA_TYPE_MEDIUMBLOB
:
break
;
default:
ASSERT
(
0
);
}
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录