Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
64554461
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
64554461
编写于
6月 17, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more work
上级
9bc64e80
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
178 addition
and
117 deletion
+178
-117
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+78
-18
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+100
-99
未找到文件。
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
64554461
...
...
@@ -398,26 +398,81 @@ _err:
return
code
;
}
static
int32_t
tsdbMergeCommitImpl
(
SCommitter
*
pCommitter
,
SBlockIdx
*
pBlockIdx
,
STbDataIter
*
pIter
,
SBlock
*
pBlock
)
{
int32_t
code
=
0
;
int32_t
nRow
=
0
;
SBlock
block
=
BLOCK_INIT_VAL
;
static
int32_t
tsdbGetOverlapRowNumber
(
STbDataIter
*
pIter
,
SBlock
*
pBlock
)
{
int32_t
nRow
=
0
;
TSDBROW
*
pRow
;
TSDBKEY
key
;
int32_t
c
=
0
;
STbDataIter
iter
=
*
pIter
;
iter
.
pRow
=
NULL
;
while
(
true
)
{
pRow
=
tsdbTbDataIterGet
(
pIter
);
if
(
pBlock
->
last
)
{
// load last and merge until {pCommitter->maxKey, INT64_MAX}
}
else
{
// scan pIter, check how many rows in the block range
if
(
pBlock
->
nRow
+
nRow
<=
pCommitter
->
maxRow
)
{
if
(
pBlock
->
nSubBlock
<
TSDB_MAX_SUBBLOCKS
)
{
// add as a subblock
if
(
pRow
==
NULL
)
break
;
key
=
tsdbRowKey
(
pRow
);
c
=
tBlockCmprFn
(
&
(
SBlock
){.
info
.
maxKey
=
key
,
.
info
.
minKey
=
key
},
pBlock
);
if
(
c
==
0
)
{
nRow
++
;
}
else
if
(
c
>
0
)
{
break
;
}
else
{
ASSERT
(
0
);
}
}
return
nRow
;
}
static
int32_t
tsdbMergeCommitImpl
(
SCommitter
*
pCommitter
,
SBlockIdx
*
pBlockIdx
,
STbDataIter
*
pIter
,
SBlock
*
pBlock
,
int8_t
toDataOnly
)
{
int32_t
code
=
0
;
int32_t
iRow
=
0
;
int32_t
nRow
=
0
;
int32_t
c
;
TSDBROW
*
pRow
;
SBlock
block
=
BLOCK_INIT_VAL
;
TSDBKEY
key1
;
TSDBKEY
key2
;
tsdbBlockDataClear
(
&
pCommitter
->
bDataN
);
// load last and merge until {pCommitter->maxKey, INT64_MAX}
code
=
tsdbReadBlockData
(
pCommitter
->
pReader
,
pBlockIdx
,
pBlock
,
&
pCommitter
->
bDataO
,
NULL
,
0
,
NULL
,
NULL
);
if
(
code
)
goto
_err
;
iRow
=
0
;
nRow
=
pCommitter
->
bDataO
.
nRow
;
pRow
=
tsdbTbDataIterGet
(
pIter
);
while
(
true
)
{
if
((
pRow
==
NULL
||
pRow
->
pTSRow
->
ts
>
pCommitter
->
maxKey
)
&&
(
iRow
>=
nRow
))
{
if
(
pCommitter
->
bDataN
.
nRow
>
0
)
{
goto
_write_block_data
;
}
else
{
// load the block, merge until pBlock->maxKey
break
;
}
}
else
{
// load the block, merge until pBlock->maxKey
}
// TODO
_write_block_data:
block
.
last
=
pCommitter
->
bDataN
.
nRow
<
pCommitter
->
minRow
?
1
:
0
;
code
=
tsdbWriteBlockData
(
pCommitter
->
pWriter
,
&
pCommitter
->
bDataN
,
NULL
,
pBlockIdx
,
&
block
);
if
(
code
)
goto
_err
;
code
=
tMapDataPutItem
(
&
pCommitter
->
nBlock
,
&
block
,
tPutBlock
);
if
(
code
)
goto
_err
;
}
block
=
BLOCK_INIT_VAL
;
tsdbBlockDataClear
(
&
pCommitter
->
bDataN
);
return
code
;
_err:
tsdbError
(
"vgId:%d merge commit impl failed since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
...
...
@@ -436,7 +491,7 @@ static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STb
if
(
code
)
goto
_err
;
}
else
if
(
pBlock
->
last
)
{
// merge
code
=
tsdbMergeCommitImpl
(
pCommitter
,
pBlockIdx
,
pIter
,
pBlock
);
code
=
tsdbMergeCommitImpl
(
pCommitter
,
pBlockIdx
,
pIter
,
pBlock
,
0
);
if
(
code
)
goto
_err
;
}
else
{
// memory
...
...
@@ -456,9 +511,14 @@ static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STb
code
=
tMapDataPutItem
(
&
pCommitter
->
nBlock
,
pBlock
,
tPutBlock
);
if
(
code
)
goto
_err
;
}
else
if
(
c
==
0
)
{
// merge
code
=
tsdbMergeCommitImpl
(
pCommitter
,
pBlockIdx
,
pIter
,
pBlock
);
if
(
code
)
goto
_err
;
int32_t
nOverlap
=
tsdbGetOverlapRowNumber
(
pIter
,
pBlock
);
if
(
pBlock
->
nRow
+
nOverlap
>
pCommitter
->
maxRow
||
pBlock
->
nSubBlock
==
TSDB_MAX_SUBBLOCKS
)
{
code
=
tsdbMergeCommitImpl
(
pCommitter
,
pBlockIdx
,
pIter
,
pBlock
,
1
);
if
(
code
)
goto
_err
;
}
else
{
// add as a subblock
}
}
else
{
ASSERT
(
0
);
}
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
64554461
...
...
@@ -113,7 +113,7 @@ struct STsdbReader {
// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
// SColumnDataAgg** pstatis;// the ptr array list to return to caller
int32_t
numOfBlocks
;
SArray
*
pColumns
;
//
column list, SColumnInfoData array list
SArray
*
pColumns
;
//
SArray<SColumnInfoData>
bool
locateStart
;
int32_t
outputCapacity
;
int32_t
realNumOfRows
;
...
...
@@ -180,42 +180,43 @@ struct STsdbReader {
// return pLocalIdList;
// }
//
static SArray* createCheckInfoFromTableGroup(STsdbReader* pTsdbReadHandle, STableListInfo* pTableList) {
// size_t tableSize = taosArrayGetSize(pTableList->pTableList);
// assert(tableSize >= 1);
static
SArray
*
createCheckInfoFromTableGroup
(
STsdbReader
*
pTsdbReadHandle
,
STableListInfo
*
pTableList
)
{
// size_t tableSize = taosArrayGetSize(pTableList->pTableList);
// assert(tableSize >= 1);
// // allocate buffer in order to load data blocks from file
// SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo));
// if (pTableCheckInfo == NULL) {
// return NULL;
// }
// // allocate buffer in order to load data blocks from file
// SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo));
// if (pTableCheckInfo == NULL) {
// return NULL;
// }
// // todo apply the lastkey of table check to avoid to load header file
// for (int32_t j = 0; j < tableSize; ++j) {
// STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList->pTableList, j);
// // todo apply the lastkey of table check to avoid to load header file
// for (int32_t j = 0; j < tableSize; ++j) {
// STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList->pTableList, j);
// STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
// info.suid = pTsdbReadHandle->suid;
// if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
// if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
// info.lastKey = pTsdbReadHandle->window.skey;
// }
// STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
// info.suid = pTsdbReadHandle->suid;
// if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
// if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
// info.lastKey = pTsdbReadHandle->window.skey;
// }
// assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
// } else {
// info.lastKey = pTsdbReadHandle->window.skey;
// }
// assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
// } else {
// info.lastKey = pTsdbReadHandle->window.skey;
// }
// taosArrayPush(pTableCheckInfo, &info);
// tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId,
// info.lastKey,
// pTsdbReadHandle->idStr);
// }
// taosArrayPush(pTableCheckInfo, &info);
// tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId,
// info.lastKey,
// pTsdbReadHandle->idStr);
// }
// // TODO group table according to the tag value.
// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
// return pTableCheckInfo;
// }
// // TODO group table according to the tag value.
// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
// return pTableCheckInfo;
return
NULL
;
}
// static void resetCheckInfo(STsdbReader* pTsdbReadHandle) {
// size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
...
...
@@ -266,31 +267,31 @@ struct STsdbReader {
// return now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
// }
// static void setQueryTimewindow(STsdbReader* pTsdbReadHandle
, SQueryTableDataCond* pCond, int32_t tWinIdx) {
// pTsdbReadHandle
->window = pCond->twindows[tWinIdx];
static
void
setQueryTimewindow
(
STsdbReader
*
pReader
,
SQueryTableDataCond
*
pCond
,
int32_t
tWinIdx
)
{
// pReader
->window = pCond->twindows[tWinIdx];
//
bool updateTs = false;
// int64_t startTs = getEarliestValidTimestamp(pTsdbReadHandle
->pTsdb);
// if (ASCENDING_TRAVERSE(pTsdbReadHandle
->order)) {
// if (startTs > pTsdbReadHandle
->window.skey) {
// pTsdbReadHandle
->window.skey = startTs;
//
pCond->twindows[tWinIdx].skey = startTs;
//
updateTs = true;
//
}
//
} else {
// if (startTs > pTsdbReadHandle
->window.ekey) {
// pTsdbReadHandle
->window.ekey = startTs;
//
pCond->twindows[tWinIdx].ekey = startTs;
//
updateTs = true;
//
}
//
}
//
bool updateTs = false;
// int64_t startTs = getEarliestValidTimestamp(pReader
->pTsdb);
// if (ASCENDING_TRAVERSE(pReader
->order)) {
// if (startTs > pReader
->window.skey) {
// pReader
->window.skey = startTs;
//
pCond->twindows[tWinIdx].skey = startTs;
//
updateTs = true;
//
}
//
} else {
// if (startTs > pReader
->window.ekey) {
// pReader
->window.ekey = startTs;
//
pCond->twindows[tWinIdx].ekey = startTs;
//
updateTs = true;
//
}
//
}
//
if (updateTs) {
//
tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
// pTsdbReadHandle, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].e
key,
// pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle
->idStr);
//
}
//
}
//
if (updateTs) {
//
tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
// pReader, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].ekey, pReader->window.s
key,
// pReader->window.ekey, pReader
->idStr);
//
}
}
static
int32_t
tsdbReaderCreate
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
uint64_t
qId
,
uint64_t
taskId
,
STsdbReader
**
ppReader
)
{
...
...
@@ -326,50 +327,50 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, uint
// // goto _end;
// // }
// setQueryTimewindow(pReadHandle
, pCond, 0);
//
if (pCond->numOfCols > 0) {
// int32_t rowLen = 0;
// for (int32_t i = 0; i < pCond->numOfCols; ++i) {
// rowLen += pCond->colList[i].bytes;
// }
// // make sure the output SSDataBlock size be less than 2MB.
// int32_t TWOMB = 2 * 1024 * 1024;
// if (pReadHandle->outputCapacity * rowLen > TWOMB) {
// pReadHandle->outputCapacity = TWOMB / rowLen;
// }
// // allocate buffer in order to load data blocks from file
// pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
// if (pReadHandle->suppInfo.pstatis == NULL) {
// goto _end;
// }
// // todo: use list instead of array?
// pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
// if (pReadHandle->pColumns == NULL) {
// goto _end;
// }
// for (int32_t i = 0; i < pCond->numOfCols; ++i) {
// SColumnInfoData colInfo = {{0}, 0};
// colInfo.info = pCond->colList[i];
// int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity);
// if (code != TSDB_CODE_SUCCESS) {
// goto _end;
// }
// taosArrayPush(pReadHandle->pColumns, &colInfo);
// }
// pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
// size_t size = taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn);
// pReadHandle->suppInfo.slotIds = taosMemoryCalloc(size, sizeof(int32_t));
// pReadHandle->suppInfo.plist = taosMemoryCalloc(size, POINTER_BYTES);
//
}
setQueryTimewindow
(
pReader
,
pCond
,
0
);
if
(
pCond
->
numOfCols
>
0
)
{
// int32_t rowLen = 0;
// for (int32_t i = 0; i < pCond->numOfCols; ++i) {
// rowLen += pCond->colList[i].bytes;
// }
// // make sure the output SSDataBlock size be less than 2MB.
// int32_t TWOMB = 2 * 1024 * 1024;
// if (pReadHandle->outputCapacity * rowLen > TWOMB) {
// pReadHandle->outputCapacity = TWOMB / rowLen;
// }
// // allocate buffer in order to load data blocks from file
// pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
// if (pReadHandle->suppInfo.pstatis == NULL) {
// goto _end;
// }
// // todo: use list instead of array?
// pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
// if (pReadHandle->pColumns == NULL) {
// goto _end;
// }
// for (int32_t i = 0; i < pCond->numOfCols; ++i) {
// SColumnInfoData colInfo = {{0}, 0};
// colInfo.info = pCond->colList[i];
// int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity);
// if (code != TSDB_CODE_SUCCESS) {
// goto _end;
// }
// taosArrayPush(pReadHandle->pColumns, &colInfo);
// }
// pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
// size_t size = taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn);
// pReadHandle->suppInfo.slotIds = taosMemoryCalloc(size, sizeof(int32_t));
// pReadHandle->suppInfo.plist = taosMemoryCalloc(size, POINTER_BYTES);
}
// pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows);
// if (pReadHandle->pDataCols == NULL) {
...
...
@@ -378,7 +379,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, uint
// goto _end;
// }
//
tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo);
// tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo);
// tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo);
// return (STsdbReader*)pReadHandle;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录