Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7deee1c4
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7deee1c4
编写于
3月 02, 2023
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: last cache read problem after schema change
上级
0fe9b5ca
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
76 addition
and
22 deletion
+76
-22
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+1
-0
source/dnode/vnode/src/tsdb/tsdbCache.c
source/dnode/vnode/src/tsdb/tsdbCache.c
+72
-21
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
+3
-1
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
7deee1c4
...
...
@@ -766,6 +766,7 @@ typedef struct SCacheRowsReader {
TdThreadMutex
readerMutex
;
SVnode
*
pVnode
;
STSchema
*
pSchema
;
STSchema
*
pCurrSchema
;
uint64_t
uid
;
uint64_t
suid
;
char
**
transferBuf
;
// todo remove it soon
...
...
source/dnode/vnode/src/tsdb/tsdbCache.c
浏览文件 @
7deee1c4
...
...
@@ -1390,17 +1390,56 @@ _err:
return
code
;
}
static
int32_t
mergeLast
(
tb_uid_t
uid
,
STsdb
*
pTsdb
,
SArray
**
ppLastArray
,
SCacheRowsReader
*
pr
)
{
int32_t
code
=
0
;
static
int32_t
cloneTSchema
(
STSchema
*
pSrc
,
STSchema
**
ppDst
)
{
int32_t
len
=
sizeof
(
STSchema
)
+
sizeof
(
STColumn
)
*
pSrc
->
numOfCols
;
*
ppDst
=
taosMemoryMalloc
(
len
);
if
(
NULL
==
*
ppDst
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
memcpy
(
*
ppDst
,
pSrc
,
len
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
updateTSchema
(
int32_t
sversion
,
SCacheRowsReader
*
pReader
,
uint64_t
uid
)
{
if
(
NULL
==
pReader
->
pCurrSchema
&&
sversion
==
pReader
->
pSchema
->
version
)
{
return
cloneTSchema
(
pReader
->
pSchema
,
&
pReader
->
pCurrSchema
);
}
if
(
NULL
!=
pReader
->
pCurrSchema
&&
sversion
==
pReader
->
pCurrSchema
->
version
)
{
return
TSDB_CODE_SUCCESS
;
}
taosMemoryFreeClear
(
pReader
->
pCurrSchema
);
return
metaGetTbTSchemaEx
(
pReader
->
pTsdb
->
pVnode
->
pMeta
,
pReader
->
suid
,
uid
,
sversion
,
&
pReader
->
pCurrSchema
);
}
static
int32_t
initLastColArray
(
STSchema
*
pTSchema
,
SArray
**
ppColArray
)
{
SArray
*
pColArray
=
taosArrayInit
(
pTSchema
->
numOfCols
,
sizeof
(
SLastCol
));
if
(
NULL
==
pColArray
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
for
(
int32_t
i
=
0
;
i
<
pTSchema
->
numOfCols
;
++
i
)
{
SLastCol
col
=
{.
ts
=
0
,
.
colVal
=
COL_VAL_NULL
(
pTSchema
->
columns
[
i
].
colId
,
pTSchema
->
columns
[
i
].
type
)};
taosArrayPush
(
pColArray
,
&
col
);
}
*
ppColArray
=
pColArray
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mergeLast
(
tb_uid_t
uid
,
STsdb
*
pTsdb
,
SArray
**
ppLastArray
,
SCacheRowsReader
*
pr
)
{
STSchema
*
pTSchema
=
pr
->
pSchema
;
// metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
int16_t
nCol
=
pTSchema
->
numOfCols
;
int16_t
iCol
=
0
;
int16_t
nLastCol
=
pTSchema
->
numOfCols
;
int16_t
noneCol
=
0
;
bool
setNoneCol
=
false
;
SArray
*
pColArray
=
taosArrayInit
(
nCol
,
sizeof
(
SLastCol
))
;
SArray
*
pColArray
=
NULL
;
SColVal
*
pColVal
=
&
(
SColVal
){
0
};
int32_t
code
=
initLastColArray
(
pTSchema
,
&
pColArray
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
TSKEY
lastRowTs
=
TSKEY_MAX
;
CacheNextRowIter
iter
=
{
0
};
...
...
@@ -1415,6 +1454,13 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
break
;
}
code
=
updateTSchema
(
TSDBROW_SVERSION
(
pRow
),
pr
,
uid
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
goto
_err
;
}
pTSchema
=
pr
->
pCurrSchema
;
int16_t
nCol
=
pTSchema
->
numOfCols
;
TSKEY
rowTs
=
TSDBROW_TS
(
pRow
);
if
(
lastRowTs
==
TSKEY_MAX
)
{
...
...
@@ -1422,28 +1468,27 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
STColumn
*
pTColumn
=
&
pTSchema
->
columns
[
0
];
*
pColVal
=
COL_VAL_VALUE
(
pTColumn
->
colId
,
pTColumn
->
type
,
(
SValue
){.
val
=
lastRowTs
});
if
(
taosArrayPush
(
pColArray
,
&
(
SLastCol
){.
ts
=
lastRowTs
,
.
colVal
=
*
pColVal
})
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
taosArraySet
(
pColArray
,
0
,
&
(
SLastCol
){.
ts
=
lastRowTs
,
.
colVal
=
*
pColVal
});
for
(
iCol
=
1
;
iCol
<
nCol
;
++
iCol
)
{
for
(
int16_t
iCol
=
1
;
iCol
<
nCol
;
++
iCol
)
{
if
(
iCol
>=
nLastCol
)
{
break
;
}
SLastCol
*
pCol
=
taosArrayGet
(
pColArray
,
iCol
);
if
(
pCol
->
colVal
.
cid
!=
pTSchema
->
columns
[
iCol
].
colId
)
{
continue
;
}
tsdbRowGetColVal
(
pRow
,
pTSchema
,
iCol
,
pColVal
);
SLastCol
lastCol
=
{.
ts
=
lastRowTs
,
.
colVal
=
*
pColVal
};
*
pCol
=
(
SLastCol
)
{.
ts
=
lastRowTs
,
.
colVal
=
*
pColVal
};
if
(
IS_VAR_DATA_TYPE
(
pColVal
->
type
)
&&
pColVal
->
value
.
nData
>
0
)
{
lastCol
.
colVal
.
value
.
pData
=
taosMemoryMalloc
(
lastCol
.
colVal
.
value
.
nData
);
if
(
lastCol
.
colVal
.
value
.
pData
==
NULL
)
{
pCol
->
colVal
.
value
.
pData
=
taosMemoryMalloc
(
pCol
->
colVal
.
value
.
nData
);
if
(
pCol
->
colVal
.
value
.
pData
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
memcpy
(
lastCol
.
colVal
.
value
.
pData
,
pColVal
->
value
.
pData
,
pColVal
->
value
.
nData
);
}
if
(
taosArrayPush
(
pColArray
,
&
lastCol
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
memcpy
(
pCol
->
colVal
.
value
.
pData
,
pColVal
->
value
.
pData
,
pColVal
->
value
.
nData
);
}
if
(
!
COL_VAL_IS_VALUE
(
pColVal
)
&&
!
setNoneCol
)
{
...
...
@@ -1461,9 +1506,15 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
// merge into pColArray
setNoneCol
=
false
;
for
(
iCol
=
noneCol
;
iCol
<
nCol
;
++
iCol
)
{
for
(
int16_t
iCol
=
noneCol
;
iCol
<
nCol
;
++
iCol
)
{
if
(
iCol
>=
nLastCol
)
{
break
;
}
// high version's column value
SLastCol
*
lastColVal
=
(
SLastCol
*
)
taosArrayGet
(
pColArray
,
iCol
);
if
(
lastColVal
->
colVal
.
cid
!=
pTSchema
->
columns
[
iCol
].
colId
)
{
continue
;
}
SColVal
*
tColVal
=
&
lastColVal
->
colVal
;
tsdbRowGetColVal
(
pRow
,
pTSchema
,
iCol
,
pColVal
);
...
...
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
浏览文件 @
7deee1c4
...
...
@@ -209,6 +209,8 @@ void* tsdbCacherowsReaderClose(void* pReader) {
taosMemoryFree
(
p
->
pSchema
);
}
taosMemoryFree
(
p
->
pCurrSchema
);
destroyLastBlockLoadInfo
(
p
->
pLoadInfo
);
taosMemoryFree
((
void
*
)
p
->
idstr
);
...
...
@@ -303,7 +305,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
for
(
int32_t
i
=
0
;
i
<
pr
->
pSchema
->
numOfCols
;
++
i
)
{
struct
STColumn
*
pCol
=
&
pr
->
pSchema
->
columns
[
i
];
SLastCol
p
=
{.
ts
=
INT64_MIN
,
.
colVal
.
type
=
pCol
->
type
};
SLastCol
p
=
{.
ts
=
INT64_MIN
,
.
colVal
.
type
=
pCol
->
type
,
.
colVal
.
flag
=
CV_FLAG_NULL
};
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
p
.
colVal
.
value
.
pData
=
taosMemoryCalloc
(
pCol
->
bytes
,
sizeof
(
char
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录