Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
84aac9e3
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
84aac9e3
编写于
6月 07, 2022
作者:
H
Hongze Cheng
提交者:
GitHub
6月 07, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13563 from taosdata/feat/row_refact
feat: vnode multi version
上级
58ff4429
fa8c6f26
变更
17
展开全部
隐藏空白更改
内联
并排
Showing
17 changed file
with
911 addition
and
530 deletion
+911
-530
include/common/tcommon.h
include/common/tcommon.h
+1
-0
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+2
-2
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+109
-88
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+3
-1
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+60
-62
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+55
-63
source/dnode/vnode/src/tsdb/tsdbCommit2.c
source/dnode/vnode/src/tsdb/tsdbCommit2.c
+19
-19
source/dnode/vnode/src/tsdb/tsdbFile.c
source/dnode/vnode/src/tsdb/tsdbFile.c
+1
-2
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+517
-155
source/dnode/vnode/src/tsdb/tsdbMemTable2.c
source/dnode/vnode/src/tsdb/tsdbMemTable2.c
+18
-23
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+63
-79
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
+57
-30
source/dnode/vnode/src/tsdb/tsdbWrite.c
source/dnode/vnode/src/tsdb/tsdbWrite.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-0
source/libs/parser/inc/parInsertData.h
source/libs/parser/inc/parInsertData.h
+1
-2
未找到文件。
include/common/tcommon.h
浏览文件 @
84aac9e3
...
...
@@ -108,6 +108,7 @@ typedef struct SColumnInfoData {
typedef
struct
SQueryTableDataCond
{
// STimeWindow twindow;
uint64_t
suid
;
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
numOfCols
;
SColumnInfo
*
colList
;
...
...
source/dnode/vnode/CMakeLists.txt
浏览文件 @
84aac9e3
...
...
@@ -36,12 +36,12 @@ target_sources(
# tsdb
"src/tsdb/tsdbCommit.c"
"src/tsdb/tsdbCommit2.c"
#
"src/tsdb/tsdbCommit2.c"
"src/tsdb/tsdbFile.c"
"src/tsdb/tsdbFS.c"
"src/tsdb/tsdbOpen.c"
"src/tsdb/tsdbMemTable.c"
"src/tsdb/tsdbMemTable2.c"
#
"src/tsdb/tsdbMemTable2.c"
"src/tsdb/tsdbRead.c"
"src/tsdb/tsdbReadImpl.c"
"src/tsdb/tsdbWrite.c"
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
84aac9e3
...
...
@@ -39,24 +39,46 @@ typedef struct SDelOp SDelOp;
static
int
tsdbKeyCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
// tsdbMemTable ==============================================================================================
typedef
struct
STbData
STbData
;
typedef
struct
SMemTable
SMemTable
;
typedef
struct
STbDataIter
STbDataIter
;
typedef
struct
SMergeInfo
SMergeInfo
;
typedef
struct
STable
STable
;
// SMemTable
int32_t
tsdbMemTableCreate
(
STsdb
*
pTsdb
,
SMemTable
**
ppMemTable
);
void
tsdbMemTableDestroy
(
SMemTable
*
pMemTable
);
void
tsdbGetTbDataFromMemTable
(
SMemTable
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
,
STbData
**
ppTbData
);
// STbDataIter
int32_t
tsdbTbDataIterCreate
(
STbData
*
pTbData
,
TSDBKEY
*
pFrom
,
int8_t
backward
,
STbDataIter
**
ppIter
);
void
*
tsdbTbDataIterDestroy
(
STbDataIter
*
pIter
);
void
tsdbTbDataIterOpen
(
STbData
*
pTbData
,
TSDBKEY
*
pFrom
,
int8_t
backward
,
STbDataIter
*
pIter
);
bool
tsdbTbDataIterNext
(
STbDataIter
*
pIter
);
bool
tsdbTbDataIterGet
(
STbDataIter
*
pIter
,
TSDBROW
*
pRow
);
int
tsdbLoadDataFromCache
(
STsdb
*
pTsdb
,
STable
*
pTable
,
STbDataIter
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
);
// tsdbMemTable2.c ==============================================================================================
typedef
struct
SMemTable
SMemTable
;
typedef
struct
SMemData
SMemData
;
typedef
struct
SMemDataIter
SMemDataIter
;
// typedef struct SMemTable2 SMemTable2
;
//
typedef struct SMemData SMemData;
//
typedef struct SMemDataIter SMemDataIter;
int32_t
tsdbMemTableCreate2
(
STsdb
*
pTsdb
,
SMemTable
**
ppMemTable
);
void
tsdbMemTableDestroy2
(
SMemTable
*
pMemTable
);
int32_t
tsdbInsertTableData2
(
STsdb
*
pTsdb
,
int64_t
version
,
SVSubmitBlk
*
pSubmitBlk
);
int32_t
tsdbDeleteTableData2
(
STsdb
*
pTsdb
,
int64_t
version
,
tb_uid_t
suid
,
tb_uid_t
uid
,
TSKEY
sKey
,
TSKEY
eKey
);
// int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2
**ppMemTable);
// void tsdbMemTableDestroy2(SMemTable2
*pMemTable);
//
int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk);
//
int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
/* SMemDataIter */
void
tsdbMemDataIterOpen
(
SMemData
*
pMemData
,
TSDBKEY
*
pKey
,
int8_t
backward
,
SMemDataIter
*
pIter
);
bool
tsdbMemDataIterNext
(
SMemDataIter
*
pIter
);
void
tsdbMemDataIterGet
(
SMemDataIter
*
pIter
,
TSDBROW
**
ppRow
);
/
/ /
* SMemDataIter */
//
void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter);
//
bool tsdbMemDataIterNext(SMemDataIter *pIter);
//
void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow);
// tsdbCommit2.c ==============================================================================================
int32_t
tsdbBegin2
(
STsdb
*
pTsdb
);
int32_t
tsdbCommit2
(
STsdb
*
pTsdb
);
//
//
tsdbCommit2.c ==============================================================================================
//
int32_t tsdbBegin2(STsdb *pTsdb);
//
int32_t tsdbCommit2(STsdb *pTsdb);
// tsdbFile.c ==============================================================================================
typedef
int32_t
TSDB_FILE_T
;
...
...
@@ -124,17 +146,6 @@ int tsdbRLockFS(STsdbFS *pFs);
int
tsdbWLockFS
(
STsdbFS
*
pFs
);
int
tsdbUnLockFS
(
STsdbFS
*
pFs
);
// tsdbMemTable ================
typedef
struct
STbData
STbData
;
typedef
struct
STsdbMemTable
STsdbMemTable
;
typedef
struct
SMergeInfo
SMergeInfo
;
typedef
struct
STable
STable
;
int
tsdbMemTableCreate
(
STsdb
*
pTsdb
,
STsdbMemTable
**
ppMemTable
);
void
tsdbMemTableDestroy
(
STsdbMemTable
*
pMemTable
);
int
tsdbLoadDataFromCache
(
STsdb
*
pTsdb
,
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
);
// structs
typedef
struct
{
int
minFid
;
...
...
@@ -145,30 +156,27 @@ typedef struct {
#define TSDB_DATA_DIR_LEN 6 // adapt accordingly
struct
STsdb
{
char
*
path
;
SVnode
*
pVnode
;
TdThreadMutex
mutex
;
char
dir
[
TSDB_DATA_DIR_LEN
];
bool
repoLocked
;
STsdbKeepCfg
keepCfg
;
S
TsdbMemTable
*
mem
;
S
TsdbMemTable
*
imem
;
SRtn
rtn
;
STsdbFS
*
fs
;
char
*
path
;
SVnode
*
pVnode
;
TdThreadMutex
mutex
;
char
dir
[
TSDB_DATA_DIR_LEN
];
bool
repoLocked
;
STsdbKeepCfg
keepCfg
;
S
MemTable
*
mem
;
S
MemTable
*
imem
;
SRtn
rtn
;
STsdbFS
*
fs
;
};
#if 1 // ======================================
struct
STable
{
uint64_t
t
id
;
uint64_t
su
id
;
uint64_t
uid
;
STSchema
*
pSchema
;
// latest schema
STSchema
*
pCacheSchema
;
// cached cache
};
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
// int tsdbPrepareCommit(STsdb *pTsdb);
typedef
enum
{
TSDB_FILE_HEAD
=
0
,
// .head
...
...
@@ -206,27 +214,44 @@ struct SDFileSet {
SDFile
files
[
TSDB_FILE_MAX
];
};
struct
TSDBKEY
{
int64_t
version
;
TSKEY
ts
;
};
typedef
struct
SMemSkipListNode
SMemSkipListNode
;
struct
SMemSkipListNode
{
int8_t
level
;
SMemSkipListNode
*
forwards
[
0
];
};
typedef
struct
SMemSkipList
{
uint32_t
seed
;
int64_t
size
;
int8_t
maxLevel
;
int8_t
level
;
SMemSkipListNode
*
pHead
;
SMemSkipListNode
*
pTail
;
}
SMemSkipList
;
struct
STbData
{
tb_uid_t
uid
;
TSKEY
keyMin
;
TS
KEY
keyMax
;
int64_t
minVer
;
int64_t
maxVer
;
int64_t
nrows
;
S
SkipList
*
pData
;
tb_uid_t
s
uid
;
tb_uid_t
uid
;
TS
DBKEY
minKey
;
TSDBKEY
maxKey
;
SDelOp
*
pHead
;
SDelOp
*
pTail
;
S
MemSkipList
sl
;
};
struct
STsdbMemTable
{
SVBufPool
*
pPool
;
T_REF_DECLARE
()
SRWLatch
latch
;
TSKEY
keyMin
;
TSKEY
keyMax
;
int64_t
minVer
;
int64_t
maxVer
;
int64_t
nRow
;
SSkipList
*
pSlIdx
;
// SSkiplist<STbData>
SHashObj
*
pHashIdx
;
struct
SMemTable
{
SRWLatch
latch
;
STsdb
*
pTsdb
;
int32_t
nRef
;
TSDBKEY
minKey
;
TSDBKEY
maxKey
;
int64_t
nRow
;
int64_t
nDelOp
;
SArray
*
aTbData
;
// SArray<STbData>
};
struct
STsdbFSMeta
{
...
...
@@ -237,9 +262,11 @@ struct STsdbFSMeta {
// ==================
typedef
struct
{
STsdbFSMeta
meta
;
// FS meta
SArray
*
df
;
// data file array
SArray
*
sf
;
// sma data file array v2f1900.index_name_1
STsdbFSMeta
meta
;
// FS meta
SDFile
cacheFile
;
// cache file
SDFile
tombstone
;
// tomestome file
SArray
*
df
;
// data file array
SArray
*
sf
;
// sma data file array v2f1900.index_name_1
}
SFSStatus
;
struct
STsdbFS
{
...
...
@@ -292,16 +319,24 @@ static void *taosTZfree(void *ptr);
static
size_t
taosTSizeof
(
void
*
ptr
);
static
void
taosTMemset
(
void
*
ptr
,
int
c
);
static
FORCE_INLINE
STSRow
*
tsdbNextIterRow
(
SSkipListIterator
*
pIter
)
{
struct
TSDBROW
{
int64_t
version
;
STSRow
*
pTSRow
;
};
static
FORCE_INLINE
STSRow
*
tsdbNextIterRow
(
STbDataIter
*
pIter
)
{
TSDBROW
row
;
if
(
pIter
==
NULL
)
return
NULL
;
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
);
if
(
node
==
NULL
)
return
NULL
;
if
(
tsdbTbDataIterGet
(
pIter
,
&
row
))
{
return
row
.
pTSRow
;
}
return
(
STSRow
*
)
SL_GET_NODE_DATA
(
node
)
;
return
NULL
;
}
static
FORCE_INLINE
TSKEY
tsdbNextIterKey
(
S
SkipListIterato
r
*
pIter
)
{
static
FORCE_INLINE
TSKEY
tsdbNextIterKey
(
S
TbDataIte
r
*
pIter
)
{
STSRow
*
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
)
return
TSDB_DATA_TIMESTAMP_NULL
;
...
...
@@ -311,11 +346,6 @@ static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator *pIter) {
// tsdbReadImpl
typedef
struct
SReadH
SReadH
;
struct
TSDBKEY
{
int64_t
version
;
TSKEY
ts
;
};
typedef
struct
{
uint64_t
suid
;
uint64_t
uid
;
...
...
@@ -354,7 +384,7 @@ typedef struct {
typedef
struct
{
int32_t
delimiter
;
// For recovery usage
int32_t
t
id
;
uint64_t
su
id
;
uint64_t
uid
;
SBlock
blocks
[];
}
SBlockInfo
;
...
...
@@ -650,11 +680,6 @@ struct SFSIter {
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
struct
TSDBROW
{
int64_t
version
;
STSRow2
tsRow
;
};
struct
TABLEID
{
tb_uid_t
suid
;
tb_uid_t
uid
;
...
...
@@ -675,7 +700,7 @@ typedef struct {
TSKEY
eKey
;
}
SDelInfo
;
struct
SMemTable
{
struct
SMemTable
2
{
STsdb
*
pTsdb
;
int32_t
nRef
;
TSDBKEY
minKey
;
...
...
@@ -705,16 +730,6 @@ static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) {
return
0
;
}
typedef
struct
SMemSkipListNode
SMemSkipListNode
;
typedef
struct
SMemSkipList
{
uint32_t
seed
;
int32_t
size
;
int8_t
maxLevel
;
int8_t
level
;
SMemSkipListNode
*
pHead
;
SMemSkipListNode
*
pTail
;
}
SMemSkipList
;
struct
SMemData
{
tb_uid_t
suid
;
tb_uid_t
uid
;
...
...
@@ -726,13 +741,19 @@ struct SMemData {
};
struct
SMemDataIter
{
S
MemData
*
pMemData
;
S
TbData
*
pMemData
;
int8_t
backward
;
TSDBROW
*
pRow
;
SMemSkipListNode
*
pNode
;
// current node
TSDBROW
row
;
};
struct
STbDataIter
{
STbData
*
pTbData
;
int8_t
backward
;
SMemSkipListNode
*
pNode
;
};
#endif
#ifdef __cplusplus
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
84aac9e3
...
...
@@ -116,7 +116,9 @@ int tsdbBegin(STsdb* pTsdb);
int32_t
tsdbCommit
(
STsdb
*
pTsdb
);
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
int
tsdbInsertTableData
(
STsdb
*
pTsdb
,
SSubmitMsgIter
*
pMsgIter
,
SSubmitBlk
*
pBlock
,
SSubmitBlkRsp
*
pRsp
);
int32_t
tsdbInsertTableData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitMsgIter
*
pMsgIter
,
SSubmitBlk
*
pBlock
,
SSubmitBlkRsp
*
pRsp
);
int32_t
tsdbDeleteTableData
(
STsdb
*
pTsdb
,
int64_t
version
,
tb_uid_t
suid
,
tb_uid_t
uid
,
TSKEY
sKey
,
TSKEY
eKey
);
tsdbReaderT
*
tsdbQueryTables
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STableListInfo
*
tableList
,
uint64_t
qId
,
uint64_t
taskId
);
tsdbReaderT
tsdbQueryCacheLastT
(
STsdb
*
tsdb
,
SQueryTableDataCond
*
pCond
,
STableListInfo
*
tableList
,
uint64_t
qId
,
...
...
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
84aac9e3
...
...
@@ -31,7 +31,7 @@ void metaReaderClear(SMetaReader *pReader) {
}
int
metaGetTableEntryByVersion
(
SMetaReader
*
pReader
,
int64_t
version
,
tb_uid_t
uid
)
{
SMeta
*
pMeta
=
pReader
->
pMeta
;
SMeta
*
pMeta
=
pReader
->
pMeta
;
STbDbKey
tbDbKey
=
{.
version
=
version
,
.
uid
=
uid
};
// query table.db
...
...
@@ -54,7 +54,7 @@ _err:
}
int
metaGetTableEntryByUid
(
SMetaReader
*
pReader
,
tb_uid_t
uid
)
{
SMeta
*
pMeta
=
pReader
->
pMeta
;
SMeta
*
pMeta
=
pReader
->
pMeta
;
int64_t
version
;
// query uid.idx
...
...
@@ -68,7 +68,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
}
int
metaGetTableEntryByName
(
SMetaReader
*
pReader
,
const
char
*
name
)
{
SMeta
*
pMeta
=
pReader
->
pMeta
;
SMeta
*
pMeta
=
pReader
->
pMeta
;
tb_uid_t
uid
;
// query name.idx
...
...
@@ -82,7 +82,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
}
tb_uid_t
metaGetTableEntryUidByName
(
SMeta
*
pMeta
,
const
char
*
name
)
{
void
*
pData
=
NULL
;
void
*
pData
=
NULL
;
int
nData
=
0
;
tb_uid_t
uid
=
0
;
...
...
@@ -134,7 +134,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
int
metaTbCursorNext
(
SMTbCursor
*
pTbCur
)
{
int
ret
;
void
*
pBuf
;
void
*
pBuf
;
STbCfg
tbCfg
;
for
(;;)
{
...
...
@@ -155,7 +155,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
}
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
)
{
void
*
pData
=
NULL
;
void
*
pData
=
NULL
;
int
nData
=
0
;
int64_t
version
;
SSchemaWrapper
schema
=
{
0
};
...
...
@@ -163,37 +163,47 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
SDecoder
dc
=
{
0
};
metaRLock
(
pMeta
);
if
(
sver
<
0
)
{
if
(
tdbTbGet
(
pMeta
->
pUidIdx
,
&
uid
,
sizeof
(
uid
),
&
pData
,
&
nData
)
<
0
)
{
goto
_err
;
}
version
=
*
(
int64_t
*
)
pData
;
_query:
if
(
tdbTbGet
(
pMeta
->
pUidIdx
,
&
uid
,
sizeof
(
uid
),
&
pData
,
&
nData
)
<
0
)
{
goto
_err
;
}
tdbTbGet
(
pMeta
->
pTbDb
,
&
(
STbDbKey
){.
uid
=
uid
,
.
version
=
version
},
sizeof
(
STbDbKey
),
&
pData
,
&
nData
)
;
version
=
*
(
int64_t
*
)
pData
;
SMetaEntry
me
=
{
0
};
tDecoderInit
(
&
dc
,
pData
,
nData
);
metaDecodeEntry
(
&
dc
,
&
me
);
if
(
me
.
type
==
TSDB_SUPER_TABLE
)
{
tdbTbGet
(
pMeta
->
pTbDb
,
&
(
STbDbKey
){.
uid
=
uid
,
.
version
=
version
},
sizeof
(
STbDbKey
),
&
pData
,
&
nData
);
SMetaEntry
me
=
{
0
};
tDecoderInit
(
&
dc
,
pData
,
nData
);
metaDecodeEntry
(
&
dc
,
&
me
);
if
(
me
.
type
==
TSDB_SUPER_TABLE
)
{
if
(
sver
==
-
1
||
sver
==
me
.
stbEntry
.
schemaRow
.
version
)
{
pSchema
=
tCloneSSchemaWrapper
(
&
me
.
stbEntry
.
schemaRow
);
}
else
if
(
me
.
type
==
TSDB_NORMAL_TABLE
)
{
pSchema
=
tCloneSSchemaWrapper
(
&
me
.
ntbEntry
.
schemaRow
);
}
else
{
ASSERT
(
0
);
tDecoderClear
(
&
dc
);
goto
_exit
;
}
}
else
if
(
me
.
type
==
TSDB_CHILD_TABLE
)
{
uid
=
me
.
ctbEntry
.
suid
;
tDecoderClear
(
&
dc
);
goto
_query
;
}
else
{
if
(
tdbTbGet
(
pMeta
->
pSkmDb
,
&
(
SSkmDbKey
){.
uid
=
uid
,
.
sver
=
sver
},
sizeof
(
SSkmDbKey
),
&
pData
,
&
nData
)
<
0
)
{
goto
_err
;
if
(
sver
==
-
1
||
sver
==
me
.
ntbEntry
.
schemaRow
.
version
)
{
pSchema
=
tCloneSSchemaWrapper
(
&
me
.
ntbEntry
.
schemaRow
);
tDecoderClear
(
&
dc
);
goto
_exit
;
}
}
tDecoderClear
(
&
dc
);
tDecoderInit
(
&
dc
,
pData
,
nData
);
tDecodeSSchemaWrapper
(
&
dc
,
&
schema
);
pSchema
=
tCloneSSchemaWrapper
(
&
schema
);
tDecoderClear
(
&
dc
);
// query from skm db
if
(
tdbTbGet
(
pMeta
->
pSkmDb
,
&
(
SSkmDbKey
){.
uid
=
uid
,
.
sver
=
sver
},
sizeof
(
SSkmDbKey
),
&
pData
,
&
nData
)
<
0
)
{
goto
_err
;
}
tDecoderInit
(
&
dc
,
pData
,
nData
);
tDecodeSSchemaWrapper
(
&
dc
,
&
schema
);
pSchema
=
tCloneSSchemaWrapper
(
&
schema
);
tDecoderClear
(
&
dc
);
_exit:
metaULock
(
pMeta
);
tdbFree
(
pData
);
return
pSchema
;
...
...
@@ -205,11 +215,11 @@ _err:
}
struct
SMCtbCursor
{
SMeta
*
pMeta
;
TBC
*
pCur
;
SMeta
*
pMeta
;
TBC
*
pCur
;
tb_uid_t
suid
;
void
*
pKey
;
void
*
pVal
;
void
*
pKey
;
void
*
pVal
;
int
kLen
;
int
vLen
;
};
...
...
@@ -279,25 +289,13 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
}
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
)
{
tb_uid_t
quid
;
SMetaReader
mr
=
{
0
};
STSchema
*
pTSchema
=
NULL
;
// SMetaReader mr = {0};
STSchema
*
pTSchema
=
NULL
;
SSchemaWrapper
*
pSW
=
NULL
;
STSchemaBuilder
sb
=
{
0
};
SSchema
*
pSchema
;
metaReaderInit
(
&
mr
,
pMeta
,
0
);
metaGetTableEntryByUid
(
&
mr
,
uid
);
if
(
mr
.
me
.
type
==
TSDB_CHILD_TABLE
)
{
quid
=
mr
.
me
.
ctbEntry
.
suid
;
}
else
{
quid
=
uid
;
}
metaReaderClear
(
&
mr
);
SSchema
*
pSchema
;
pSW
=
metaGetTableSchema
(
pMeta
,
q
uid
,
sver
,
0
);
pSW
=
metaGetTableSchema
(
pMeta
,
uid
,
sver
,
0
);
if
(
!
pSW
)
return
NULL
;
tdInitTSchemaBuilder
(
&
sb
,
pSW
->
version
);
...
...
@@ -321,11 +319,11 @@ int metaGetTbNum(SMeta *pMeta) {
}
typedef
struct
{
SMeta
*
pMeta
;
TBC
*
pCur
;
SMeta
*
pMeta
;
TBC
*
pCur
;
tb_uid_t
uid
;
void
*
pKey
;
void
*
pVal
;
void
*
pKey
;
void
*
pVal
;
int
kLen
;
int
vLen
;
}
SMSmaCursor
;
...
...
@@ -397,7 +395,7 @@ tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
bool
deepCopy
)
{
STSmaWrapper
*
pSW
=
NULL
;
SArray
*
pSmaIds
=
NULL
;
SArray
*
pSmaIds
=
NULL
;
if
(
!
(
pSmaIds
=
metaGetSmaIdsByTable
(
pMeta
,
uid
)))
{
return
NULL
;
...
...
@@ -421,7 +419,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
metaReaderInit
(
&
mr
,
pMeta
,
0
);
int64_t
smaId
;
int
smaIdx
=
0
;
STSma
*
pTSma
=
NULL
;
STSma
*
pTSma
=
NULL
;
for
(
int
i
=
0
;
i
<
pSW
->
number
;
++
i
)
{
smaId
=
*
(
tb_uid_t
*
)
taosArrayGet
(
pSmaIds
,
i
);
if
(
metaGetTableEntryByUid
(
&
mr
,
smaId
)
<
0
)
{
...
...
@@ -469,7 +467,7 @@ _err:
}
STSma
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
)
{
STSma
*
pTSma
=
NULL
;
STSma
*
pTSma
=
NULL
;
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pMeta
,
0
);
if
(
metaGetTableEntryByUid
(
&
mr
,
indexUid
)
<
0
)
{
...
...
@@ -491,7 +489,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
}
SArray
*
metaGetSmaIdsByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
)
{
SArray
*
pUids
=
NULL
;
SArray
*
pUids
=
NULL
;
SSmaIdxKey
*
pSmaIdxKey
=
NULL
;
SMSmaCursor
*
pCur
=
metaOpenSmaCursor
(
pMeta
,
uid
);
...
...
@@ -529,7 +527,7 @@ SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
}
SArray
*
metaGetSmaTbUids
(
SMeta
*
pMeta
)
{
SArray
*
pUids
=
NULL
;
SArray
*
pUids
=
NULL
;
SSmaIdxKey
*
pSmaIdxKey
=
NULL
;
tb_uid_t
lastUid
=
0
;
...
...
@@ -591,13 +589,13 @@ const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) {
}
typedef
struct
{
SMeta
*
pMeta
;
TBC
*
pCur
;
SMeta
*
pMeta
;
TBC
*
pCur
;
tb_uid_t
suid
;
int16_t
cid
;
int16_t
type
;
void
*
pKey
;
void
*
pVal
;
void
*
pKey
;
void
*
pVal
;
int32_t
kLen
;
int32_t
vLen
;
}
SIdxCursor
;
...
...
@@ -621,7 +619,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
int32_t
nKey
=
0
;
int32_t
nTagData
=
0
;
void
*
tagData
=
NULL
;
void
*
tagData
=
NULL
;
if
(
IS_VAR_DATA_TYPE
(
param
->
type
))
{
tagData
=
varDataVal
(
param
->
val
);
...
...
@@ -640,7 +638,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
goto
END
;
}
void
*
entryKey
=
NULL
,
*
entryVal
=
NULL
;
void
*
entryKey
=
NULL
,
*
entryVal
=
NULL
;
int32_t
nEntryKey
,
nEntryVal
;
while
(
1
)
{
valid
=
tdbTbcGet
(
pCursor
->
pCur
,
(
const
void
**
)
&
entryKey
,
&
nEntryKey
,
(
const
void
**
)
&
entryVal
,
&
nEntryVal
);
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
84aac9e3
...
...
@@ -141,10 +141,10 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
}
// this interface use suid instead of uid
pHandle
->
pSchemaWrapper
=
metaGetTableSchema
(
pHandle
->
pVnodeMeta
,
pHandle
->
msgIter
.
s
uid
,
sversion
,
true
);
pHandle
->
pSchemaWrapper
=
metaGetTableSchema
(
pHandle
->
pVnodeMeta
,
pHandle
->
msgIter
.
uid
,
sversion
,
true
);
if
(
pHandle
->
pSchemaWrapper
==
NULL
)
{
tqWarn
(
"cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table"
,
pHandle
->
msgIter
.
s
uid
,
pHandle
->
cachedSchemaVer
);
pHandle
->
msgIter
.
uid
,
pHandle
->
cachedSchemaVer
);
/*ASSERT(0);*/
terrno
=
TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND
;
return
-
1
;
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
84aac9e3
...
...
@@ -18,8 +18,8 @@
#define TSDB_MAX_SUBBLOCKS 8
typedef
struct
{
STable
*
pTable
;
S
SkipListIterato
r
*
pIter
;
STable
*
pTable
;
S
TbDataIte
r
*
pIter
;
}
SCommitIter
;
typedef
struct
{
...
...
@@ -58,26 +58,26 @@ typedef struct {
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
static
void
tsdbStartCommit
(
STsdb
*
pRepo
);
static
void
tsdbEndCommit
(
STsdb
*
pTsdb
,
int
eno
);
static
int
tsdbInitCommitH
(
SCommitH
*
pCommith
,
STsdb
*
pRepo
);
static
void
tsdbSeekCommitIter
(
SCommitH
*
pCommith
,
TSKEY
key
);
static
int
tsdbNextCommitFid
(
SCommitH
*
pCommith
);
static
void
tsdbDestroyCommitH
(
SCommitH
*
pCommith
);
static
int
tsdbCreateCommitIters
(
SCommitH
*
pCommith
);
static
void
tsdbDestroyCommitIters
(
SCommitH
*
pCommith
);
static
int
tsdbCommitToFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
);
static
void
tsdbResetCommitFile
(
SCommitH
*
pCommith
);
static
int
tsdbSetAndOpenCommitFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
);
static
int
tsdbCommitToTable
(
SCommitH
*
pCommith
,
int
tid
);
static
bool
tsdbCommitIsSameFile
(
SCommitH
*
pCommith
,
int
bidx
);
static
int
tsdbMoveBlkIdx
(
SCommitH
*
pCommith
,
SBlockIdx
*
pIdx
);
static
int
tsdbSetCommitTable
(
SCommitH
*
pCommith
,
STable
*
pTable
);
static
int
tsdbComparKeyBlock
(
const
void
*
arg1
,
const
void
*
arg2
);
static
int
tsdbWriteBlockInfo
(
SCommitH
*
pCommih
);
static
int
tsdbCommitMemData
(
SCommitH
*
pCommith
,
SCommitIter
*
pIter
,
TSKEY
keyLimit
,
bool
toData
);
static
int
tsdbMergeMemData
(
SCommitH
*
pCommith
,
SCommitIter
*
pIter
,
int
bidx
);
static
int
tsdbMoveBlock
(
SCommitH
*
pCommith
,
int
bidx
);
static
void
tsdbStartCommit
(
STsdb
*
pRepo
);
static
void
tsdbEndCommit
(
STsdb
*
pTsdb
,
int
eno
);
static
int
tsdbInitCommitH
(
SCommitH
*
pCommith
,
STsdb
*
pRepo
);
static
void
tsdbSeekCommitIter
(
SCommitH
*
pCommith
,
TSKEY
key
);
static
int
tsdbNextCommitFid
(
SCommitH
*
pCommith
);
static
void
tsdbDestroyCommitH
(
SCommitH
*
pCommith
);
static
int
32_t
tsdbCreateCommitIters
(
SCommitH
*
pCommith
);
static
void
tsdbDestroyCommitIters
(
SCommitH
*
pCommith
);
static
int
tsdbCommitToFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
);
static
void
tsdbResetCommitFile
(
SCommitH
*
pCommith
);
static
int
tsdbSetAndOpenCommitFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
);
static
int
tsdbCommitToTable
(
SCommitH
*
pCommith
,
int
tid
);
static
bool
tsdbCommitIsSameFile
(
SCommitH
*
pCommith
,
int
bidx
);
static
int
tsdbMoveBlkIdx
(
SCommitH
*
pCommith
,
SBlockIdx
*
pIdx
);
static
int
tsdbSetCommitTable
(
SCommitH
*
pCommith
,
STable
*
pTable
);
static
int
tsdbComparKeyBlock
(
const
void
*
arg1
,
const
void
*
arg2
);
static
int
tsdbWriteBlockInfo
(
SCommitH
*
pCommih
);
static
int
tsdbCommitMemData
(
SCommitH
*
pCommith
,
SCommitIter
*
pIter
,
TSKEY
keyLimit
,
bool
toData
);
static
int
tsdbMergeMemData
(
SCommitH
*
pCommith
,
SCommitIter
*
pIter
,
int
bidx
);
static
int
tsdbMoveBlock
(
SCommitH
*
pCommith
,
int
bidx
);
static
int
tsdbCommitAddBlock
(
SCommitH
*
pCommith
,
const
SBlock
*
pSupBlock
,
const
SBlock
*
pSubBlocks
,
int
nSubBlocks
);
static
int
tsdbMergeBlockData
(
SCommitH
*
pCommith
,
SCommitIter
*
pIter
,
SDataCols
*
pDataCols
,
TSKEY
keyLimit
,
bool
isLastOneBlock
);
...
...
@@ -92,7 +92,7 @@ static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn);
int
tsdbBegin
(
STsdb
*
pTsdb
)
{
if
(
!
pTsdb
)
return
0
;
S
Tsdb
MemTable
*
pMem
;
SMemTable
*
pMem
;
if
(
tsdbMemTableCreate
(
pTsdb
,
&
pTsdb
->
mem
)
<
0
)
{
return
-
1
;
...
...
@@ -244,7 +244,7 @@ void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
}
static
void
tsdbStartCommit
(
STsdb
*
pRepo
)
{
S
Tsdb
MemTable
*
pMem
=
pRepo
->
imem
;
SMemTable
*
pMem
=
pRepo
->
imem
;
tsdbInfo
(
"vgId:%d, start to commit"
,
REPO_ID
(
pRepo
));
...
...
@@ -400,7 +400,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
break
;
}
if
(
pIter
&&
pIter
->
pTable
&&
(
!
pIdx
||
(
pIter
->
pTable
->
uid
<=
pIdx
->
uid
)))
{
if
(
pIter
&&
pIter
->
pTable
&&
(
!
pIdx
||
(
pIter
->
pTable
->
suid
<=
pIdx
->
suid
||
pIter
->
pTable
->
uid
<=
pIdx
->
uid
)))
{
if
(
tsdbCommitToTable
(
pCommith
,
mIter
)
<
0
)
{
tsdbCloseCommitFile
(
pCommith
,
true
);
// revert the file change
...
...
@@ -453,57 +453,48 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
return
0
;
}
static
int
tsdbCreateCommitIters
(
SCommitH
*
pCommith
)
{
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdbMemTable
*
pMem
=
pRepo
->
imem
;
SSkipListIterator
*
pSlIter
;
SCommitIter
*
pCommitIter
;
SSkipListNode
*
pNode
;
STbData
*
pTbData
;
STSchema
*
pTSchema
=
NULL
;
static
int32_t
tsdbCreateCommitIters
(
SCommitH
*
pCommith
)
{
int32_t
code
=
0
;
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
SMemTable
*
pMem
=
pRepo
->
imem
;
STbData
*
pTbData
;
SCommitIter
*
pCommitIter
;
STSchema
*
pTSchema
=
NULL
;
pCommith
->
niters
=
SL_SIZE
(
pMem
->
pSlIdx
);
pCommith
->
niters
=
taosArrayGetSize
(
pMem
->
aTbData
);
pCommith
->
iters
=
(
SCommitIter
*
)
taosMemoryCalloc
(
pCommith
->
niters
,
sizeof
(
SCommitIter
));
if
(
pCommith
->
iters
==
NULL
)
{
terrno
=
TSDB_CODE_TDB
_OUT_OF_MEMORY
;
return
-
1
;
code
=
TSDB_CODE
_OUT_OF_MEMORY
;
goto
_err
;
}
// Loop to create iters for each skiplist
pSlIter
=
tSkipListCreateIter
(
pMem
->
pSlIdx
);
if
(
pSlIter
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
for
(
int
i
=
0
;
i
<
pCommith
->
niters
;
i
++
)
{
tSkipListIterNext
(
pSlIter
);
pNode
=
tSkipListIterGet
(
pSlIter
);
pTbData
=
(
STbData
*
)
pNode
->
pData
;
for
(
int32_t
iIter
=
0
;
iIter
<
pCommith
->
niters
;
iIter
++
)
{
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMem
->
aTbData
,
iIter
);
pCommitIter
=
&
pCommith
->
iters
[
iIter
];
pCommitIter
=
pCommith
->
iters
+
i
;
pTSchema
=
metaGetTbTSchema
(
REPO_META
(
pRepo
),
pTbData
->
uid
,
-
1
);
if
(
pTSchema
)
{
pCommitIter
->
pIter
=
tSkipListCreateIter
(
pTbData
->
pData
);
tSkipListIterNext
(
pCommitIter
->
pIter
);
tsdbTbDataIterCreate
(
pTbData
,
NULL
,
0
,
&
pCommitIter
->
pIter
);
pCommitIter
->
pTable
=
(
STable
*
)
taosMemoryMalloc
(
sizeof
(
STable
));
pCommitIter
->
pTable
->
uid
=
pTbData
->
uid
;
pCommitIter
->
pTable
->
tid
=
pTbData
->
uid
;
pCommitIter
->
pTable
->
suid
=
pTbData
->
s
uid
;
pCommitIter
->
pTable
->
pSchema
=
pTSchema
;
pCommitIter
->
pTable
->
pCacheSchema
=
NULL
;
}
}
tSkipListDestroyIter
(
pSlIter
);
return
0
;
return
code
;
_err:
return
code
;
}
static
void
tsdbDestroyCommitIters
(
SCommitH
*
pCommith
)
{
if
(
pCommith
->
iters
==
NULL
)
return
;
for
(
int
i
=
1
;
i
<
pCommith
->
niters
;
i
++
)
{
t
SkipListDestroyIter
(
pCommith
->
iters
[
i
].
pIter
);
t
sdbTbDataIterDestroy
(
pCommith
->
iters
[
i
].
pIter
);
if
(
pCommith
->
iters
[
i
].
pTable
)
{
tdFreeSchema
(
pCommith
->
iters
[
i
].
pTable
->
pSchema
);
tdFreeSchema
(
pCommith
->
iters
[
i
].
pTable
->
pCacheSchema
);
...
...
@@ -743,8 +734,8 @@ static int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA,
pBlkInfo
=
*
ppBuf
;
pBlkInfo
->
delimiter
=
TSDB_FILE_DELIMITER
;
pBlkInfo
->
tid
=
TABLE_TID
(
pTable
)
;
pBlkInfo
->
uid
=
TABLE_UID
(
pTable
)
;
pBlkInfo
->
suid
=
pTable
->
suid
;
pBlkInfo
->
uid
=
pTable
->
uid
;
memcpy
((
void
*
)(
pBlkInfo
->
blocks
),
taosArrayGet
(
pSupA
,
0
),
nSupBlocks
*
sizeof
(
SBlock
));
if
(
nSubBlocks
>
0
)
{
...
...
@@ -770,7 +761,8 @@ static int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA,
// Set pIdx
pBlock
=
taosArrayGetLast
(
pSupA
);
pIdx
->
uid
=
TABLE_UID
(
pTable
);
pIdx
->
suid
=
pTable
->
suid
;
pIdx
->
uid
=
pTable
->
uid
;
pIdx
->
hasLast
=
pBlock
->
last
?
1
:
0
;
pIdx
->
maxKey
=
pBlock
->
maxKey
;
pIdx
->
numOfBlocks
=
(
uint32_t
)
nSupBlocks
;
...
...
@@ -925,7 +917,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
return
-
1
;
}
STable
table
=
{.
tid
=
pIdx
->
uid
,
.
uid
=
pIdx
->
uid
,
.
pSchema
=
NULL
};
STable
table
=
{.
suid
=
pIdx
->
s
uid
,
.
uid
=
pIdx
->
uid
,
.
pSchema
=
NULL
};
pCommith
->
pTable
=
&
table
;
while
(
bidx
<
nBlocks
)
{
...
...
@@ -1186,7 +1178,7 @@ static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFi
}
pBlockData
->
delimiter
=
TSDB_FILE_DELIMITER
;
pBlockData
->
uid
=
TABLE_UID
(
pTable
)
;
pBlockData
->
uid
=
pTable
->
uid
;
pBlockData
->
numOfCols
=
nColsNotAllNull
;
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pBlockData
,
tsize
);
...
...
@@ -1226,7 +1218,7 @@ static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFi
tsdbDebug
(
"vgId:%d, uid:%"
PRId64
" a block of data is written to file %s, offset %"
PRId64
" numOfRows %d len %d numOfCols %"
PRId16
" keyFirst %"
PRId64
" keyLast %"
PRId64
,
REPO_ID
(
pRepo
),
TABLE_UID
(
pTable
)
,
TSDB_FILE_FULL_NAME
(
pDFile
),
offset
,
rowsToWrite
,
pBlock
->
len
,
REPO_ID
(
pRepo
),
pTable
->
uid
,
TSDB_FILE_FULL_NAME
(
pDFile
),
offset
,
rowsToWrite
,
pBlock
->
len
,
pBlock
->
numOfCols
,
pBlock
->
minKey
.
ts
,
pBlock
->
maxKey
.
ts
);
return
0
;
...
...
@@ -1313,7 +1305,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
keyLimit
=
pBlock
[
1
].
minKey
.
ts
-
1
;
}
S
SkipListIterato
r
titer
=
*
(
pIter
->
pIter
);
S
TbDataIte
r
titer
=
*
(
pIter
->
pIter
);
if
(
tsdbLoadBlockDataCols
(
&
(
pCommith
->
readh
),
pBlock
,
NULL
,
&
colId
,
1
,
false
)
<
0
)
return
-
1
;
tsdbLoadDataFromCache
(
TSDB_COMMIT_REPO
(
pCommith
),
pIter
->
pTable
,
&
titer
,
keyLimit
,
INT32_MAX
,
NULL
,
...
...
@@ -1522,7 +1514,7 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i
lastKey
=
key2
;
}
t
SkipList
IterNext
(
pCommitIter
->
pIter
);
t
sdbTbData
IterNext
(
pCommitIter
->
pIter
);
}
else
{
if
(
lastKey
!=
key1
)
{
if
(
lastKey
!=
TSKEY_INITIAL_VAL
)
{
...
...
@@ -1554,7 +1546,7 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i
tdAppendSTSRowToDataCol
(
row
,
pSchema
,
pTarget
,
true
);
}
++
(
*
iter
);
t
SkipList
IterNext
(
pCommitIter
->
pIter
);
t
sdbTbData
IterNext
(
pCommitIter
->
pIter
);
}
if
(
pTarget
->
numOfRows
>=
(
maxRows
-
1
))
break
;
...
...
source/dnode/vnode/src/tsdb/tsdbCommit2.c
浏览文件 @
84aac9e3
...
...
@@ -16,19 +16,19 @@
#include "tsdb.h"
typedef
struct
{
SMemTable
*
pMemTable
;
int32_t
minutes
;
int8_t
precision
;
TSKEY
nCommitKey
;
int32_t
fid
;
TSKEY
minKey
;
TSKEY
maxKey
;
SReadH
readh
;
SDFileSet
wSet
;
SArray
*
aBlkIdx
;
SArray
*
aSupBlk
;
SArray
*
aSubBlk
;
SArray
*
aDelInfo
;
SMemTable
2
*
pMemTable
;
int32_t
minutes
;
int8_t
precision
;
TSKEY
nCommitKey
;
int32_t
fid
;
TSKEY
minKey
;
TSKEY
maxKey
;
SReadH
readh
;
SDFileSet
wSet
;
SArray
*
aBlkIdx
;
SArray
*
aSupBlk
;
SArray
*
aSubBlk
;
SArray
*
aDelInfo
;
}
SCommitH
;
static
int32_t
tsdbCommitStart
(
SCommitH
*
pCHandle
,
STsdb
*
pTsdb
);
...
...
@@ -39,7 +39,7 @@ int32_t tsdbBegin2(STsdb *pTsdb) {
int32_t
code
=
0
;
ASSERT
(
pTsdb
->
mem
==
NULL
);
code
=
tsdbMemTableCreate2
(
pTsdb
,
(
SMemTable
**
)
&
pTsdb
->
mem
);
code
=
tsdbMemTableCreate2
(
pTsdb
,
(
SMemTable
2
**
)
&
pTsdb
->
mem
);
if
(
code
)
{
tsdbError
(
"vgId:%d failed to begin TSDB since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
goto
_exit
;
...
...
@@ -80,8 +80,8 @@ _err:
}
static
int32_t
tsdbCommitStart
(
SCommitH
*
pCHandle
,
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
SMemTable
*
pMemTable
=
(
SMemTable
*
)
pTsdb
->
mem
;
int32_t
code
=
0
;
SMemTable
2
*
pMemTable
=
(
SMemTable2
*
)
pTsdb
->
mem
;
tsdbInfo
(
"vgId:%d start to commit"
,
TD_VID
(
pTsdb
->
pVnode
));
...
...
@@ -131,9 +131,9 @@ _err:
}
static
int32_t
tsdbCommitEnd
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCHandle
->
pMemTable
->
pTsdb
;
SMemTable
*
pMemTable
=
(
SMemTable
*
)
pTsdb
->
imem
;
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCHandle
->
pMemTable
->
pTsdb
;
SMemTable
2
*
pMemTable
=
(
SMemTable2
*
)
pTsdb
->
imem
;
// end transaction
code
=
tsdbEndFSTxn
(
pTsdb
);
...
...
source/dnode/vnode/src/tsdb/tsdbFile.c
浏览文件 @
84aac9e3
...
...
@@ -25,8 +25,7 @@ static const char *TSDB_FNAME_SUFFIX[] = {
"meta"
,
// TSDB_FILE_META
};
static
void
tsdbGetFilename
(
int
vid
,
int
fid
,
uint32_t
ver
,
TSDB_FILE_T
ftype
,
const
char
*
dname
,
char
*
fname
);
// static int tsdbRollBackMFile(SMFile *pMFile);
static
void
tsdbGetFilename
(
int
vid
,
int
fid
,
uint32_t
ver
,
TSDB_FILE_T
ftype
,
const
char
*
dname
,
char
*
fname
);
static
int
tsdbEncodeDFInfo
(
void
**
buf
,
SDFInfo
*
pInfo
);
static
void
*
tsdbDecodeDFInfo
(
void
*
buf
,
SDFInfo
*
pInfo
);
static
int
tsdbRollBackDFile
(
SDFile
*
pDFile
);
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
84aac9e3
此差异已折叠。
点击以展开。
source/dnode/vnode/src/tsdb/tsdbMemTable2.c
浏览文件 @
84aac9e3
...
...
@@ -15,11 +15,6 @@
#include "tsdb.h"
struct
SMemSkipListNode
{
int8_t
level
;
SMemSkipListNode
*
forwards
[
0
];
};
typedef
struct
{
tb_uid_t
uid
;
STSchema
*
pTSchema
;
...
...
@@ -35,21 +30,21 @@ typedef struct {
#define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2
static
int32_t
tsdbGetOrCreateMemData
(
SMemTable
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
,
SMemData
**
ppMemData
);
static
int32_t
tsdbGetOrCreateMemData
(
SMemTable
2
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
,
SMemData
**
ppMemData
);
static
int
memDataPCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
static
int32_t
tPutTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
);
static
int32_t
tGetTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
);
static
int8_t
tsdbMemSkipListRandLevel
(
SMemSkipList
*
pSl
);
static
int32_t
tsdbInsertTableDataImpl
(
SMemTable
*
pMemTable
,
SMemData
*
pMemData
,
int64_t
version
,
static
int32_t
tsdbInsertTableDataImpl
(
SMemTable
2
*
pMemTable
,
SMemData
*
pMemData
,
int64_t
version
,
SVSubmitBlk
*
pSubmitBlk
);
static
void
memDataMovePosTo
(
SMemData
*
pMemData
,
SMemSkipListNode
**
pos
,
TSDBKEY
*
pKey
,
int32_t
flags
);
// SMemTable ==============================================
int32_t
tsdbMemTableCreate2
(
STsdb
*
pTsdb
,
SMemTable
**
ppMemTable
)
{
int32_t
code
=
0
;
SMemTable
*
pMemTable
=
NULL
;
int32_t
tsdbMemTableCreate2
(
STsdb
*
pTsdb
,
SMemTable
2
**
ppMemTable
)
{
int32_t
code
=
0
;
SMemTable
2
*
pMemTable
=
NULL
;
pMemTable
=
(
SMemTable
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pMemTable
));
pMemTable
=
(
SMemTable
2
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pMemTable
));
if
(
pMemTable
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
...
...
@@ -75,16 +70,16 @@ _err:
return
code
;
}
void
tsdbMemTableDestroy2
(
SMemTable
*
pMemTable
)
{
void
tsdbMemTableDestroy2
(
SMemTable
2
*
pMemTable
)
{
taosArrayDestroyEx
(
pMemTable
->
aMemData
,
NULL
/*TODO*/
);
taosMemoryFree
(
pMemTable
);
}
int32_t
tsdbInsertTableData2
(
STsdb
*
pTsdb
,
int64_t
version
,
SVSubmitBlk
*
pSubmitBlk
)
{
int32_t
code
=
0
;
SMemTable
*
pMemTable
=
(
SMemTable
*
)
pTsdb
->
mem
;
// TODO
SMemData
*
pMemData
;
TSDBROW
row
=
{.
version
=
version
};
int32_t
code
=
0
;
SMemTable
2
*
pMemTable
=
(
SMemTable2
*
)
pTsdb
->
mem
;
// TODO
SMemData
*
pMemData
;
TSDBROW
row
=
{.
version
=
version
};
ASSERT
(
pMemTable
);
ASSERT
(
pSubmitBlk
->
nData
>
0
);
...
...
@@ -112,10 +107,10 @@ _err:
}
int32_t
tsdbDeleteTableData2
(
STsdb
*
pTsdb
,
int64_t
version
,
tb_uid_t
suid
,
tb_uid_t
uid
,
TSKEY
sKey
,
TSKEY
eKey
)
{
int32_t
code
=
0
;
SMemTable
*
pMemTable
=
(
SMemTable
*
)
pTsdb
->
mem
;
// TODO
SMemData
*
pMemData
;
SVBufPool
*
pPool
=
pTsdb
->
pVnode
->
inUse
;
int32_t
code
=
0
;
SMemTable
2
*
pMemTable
=
(
SMemTable2
*
)
pTsdb
->
mem
;
// TODO
SMemData
*
pMemData
;
SVBufPool
*
pPool
=
pTsdb
->
pVnode
->
inUse
;
ASSERT
(
pMemTable
);
...
...
@@ -250,7 +245,7 @@ void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow) {
}
}
static
int32_t
tsdbGetOrCreateMemData
(
SMemTable
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
,
SMemData
**
ppMemData
)
{
static
int32_t
tsdbGetOrCreateMemData
(
SMemTable
2
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
,
SMemData
**
ppMemData
)
{
int32_t
code
=
0
;
int32_t
idx
=
0
;
SMemData
*
pMemDataT
=
&
(
SMemData
){.
suid
=
suid
,
.
uid
=
uid
};
...
...
@@ -421,7 +416,7 @@ static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY
}
}
static
int32_t
memDataDoPut
(
SMemTable
*
pMemTable
,
SMemData
*
pMemData
,
SMemSkipListNode
**
pos
,
TSDBROW
*
pRow
,
static
int32_t
memDataDoPut
(
SMemTable
2
*
pMemTable
,
SMemData
*
pMemData
,
SMemSkipListNode
**
pos
,
TSDBROW
*
pRow
,
int8_t
forward
)
{
int32_t
code
=
0
;
int8_t
level
;
...
...
@@ -475,7 +470,7 @@ _exit:
return
code
;
}
static
int32_t
tsdbInsertTableDataImpl
(
SMemTable
*
pMemTable
,
SMemData
*
pMemData
,
int64_t
version
,
static
int32_t
tsdbInsertTableDataImpl
(
SMemTable
2
*
pMemTable
,
SMemData
*
pMemData
,
int64_t
version
,
SVSubmitBlk
*
pSubmitBlk
)
{
int32_t
code
=
0
;
int32_t
n
=
0
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
84aac9e3
...
...
@@ -67,15 +67,16 @@ enum {
};
typedef
struct
STableCheckInfo
{
uint64_t
tableId
;
TSKEY
lastKey
;
SBlockInfo
*
pCompInfo
;
int32_t
compSize
;
int32_t
numOfBlocks
:
29
;
// number of qualified data blocks not the original blocks
uint8_t
chosen
:
2
;
// indicate which iterator should move forward
bool
initBuf
:
1
;
// whether to initialize the in-memory skip list iterator or not
SSkipListIterator
*
iter
;
// mem buffer skip list iterator
SSkipListIterator
*
iiter
;
// imem buffer skip list iterator
uint64_t
suid
;
uint64_t
tableId
;
TSKEY
lastKey
;
SBlockInfo
*
pCompInfo
;
int32_t
compSize
;
int32_t
numOfBlocks
:
29
;
// number of qualified data blocks not the original blocks
uint8_t
chosen
:
2
;
// indicate which iterator should move forward
bool
initBuf
:
1
;
// whether to initialize the in-memory skip list iterator or not
STbDataIter
*
iter
;
// mem buffer skip list iterator
STbDataIter
*
iiter
;
// imem buffer skip list iterator
}
STableCheckInfo
;
typedef
struct
STableBlockInfo
{
...
...
@@ -107,6 +108,7 @@ typedef struct SBlockLoadSuppInfo {
typedef
struct
STsdbReadHandle
{
STsdb
*
pTsdb
;
uint64_t
suid
;
SQueryFilePos
cur
;
// current position
int16_t
order
;
STimeWindow
window
;
// the primary query time window that applies to all queries
...
...
@@ -200,8 +202,8 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load
int64_t
tsdbGetNumOfRowsInMemTable
(
tsdbReaderT
*
pHandle
)
{
STsdbReadHandle
*
pTsdbReadHandle
=
(
STsdbReadHandle
*
)
pHandle
;
int64_t
rows
=
0
;
S
Tsdb
MemTable
*
pMemTable
=
NULL
;
// pTsdbReadHandle->pMemTable;
int64_t
rows
=
0
;
SMemTable
*
pMemTable
=
NULL
;
// pTsdbReadHandle->pMemTable;
if
(
pMemTable
==
NULL
)
{
return
rows
;
}
...
...
@@ -237,6 +239,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
STableKeyInfo
*
pKeyInfo
=
(
STableKeyInfo
*
)
taosArrayGet
(
pTableList
->
pTableList
,
j
);
STableCheckInfo
info
=
{.
lastKey
=
pKeyInfo
->
lastKey
,
.
tableId
=
pKeyInfo
->
uid
};
info
.
suid
=
pTsdbReadHandle
->
suid
;
if
(
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))
{
if
(
info
.
lastKey
==
INT64_MIN
||
info
.
lastKey
<
pTsdbReadHandle
->
window
.
skey
)
{
info
.
lastKey
=
pTsdbReadHandle
->
window
.
skey
;
...
...
@@ -265,8 +268,8 @@ static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
STableCheckInfo
*
pCheckInfo
=
(
STableCheckInfo
*
)
taosArrayGet
(
pTsdbReadHandle
->
pTableCheckInfo
,
i
);
pCheckInfo
->
lastKey
=
pTsdbReadHandle
->
window
.
skey
;
pCheckInfo
->
iter
=
t
SkipListDestroyIter
(
pCheckInfo
->
iter
);
pCheckInfo
->
iiter
=
t
SkipListDestroyIter
(
pCheckInfo
->
iiter
);
pCheckInfo
->
iter
=
t
sdbTbDataIterDestroy
(
pCheckInfo
->
iter
);
pCheckInfo
->
iiter
=
t
sdbTbDataIterDestroy
(
pCheckInfo
->
iiter
);
pCheckInfo
->
initBuf
=
false
;
if
(
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))
{
...
...
@@ -387,6 +390,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
pReadHandle
->
locateStart
=
false
;
pReadHandle
->
loadType
=
pCond
->
type
;
pReadHandle
->
suid
=
pCond
->
suid
;
pReadHandle
->
outputCapacity
=
4096
;
//((STsdb*)tsdb)->config.maxRowsPerFileBlock;
pReadHandle
->
loadExternalRow
=
pCond
->
loadExternalRows
;
pReadHandle
->
currentLoadExternalRows
=
pCond
->
loadExternalRows
;
...
...
@@ -658,7 +662,7 @@ tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableL
}
#if 0
tsdbReaderT tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, S
Tsdb
MemTable* pMemRef) {
tsdbReaderT tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemTable* pMemRef) {
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTablesT(tsdb, pCond, groupList, qId, pMemRef);
if (pTsdbReadHandle == NULL) {
return NULL;
...
...
@@ -752,23 +756,22 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
pCheckInfo
->
initBuf
=
true
;
int32_t
order
=
pHandle
->
order
;
STbData
**
pMem
=
NULL
;
STbData
**
pIMem
=
NULL
;
STbData
*
pMem
=
NULL
;
STbData
*
pIMem
=
NULL
;
int8_t
backward
=
(
pHandle
->
order
==
TSDB_ORDER_DESC
)
?
1
:
0
;
TSKEY
tLastKey
=
keyToTkey
(
pCheckInfo
->
lastKey
);
if
(
pHandle
->
pTsdb
->
mem
!=
NULL
)
{
pMem
=
taosHashGet
(
pHandle
->
pTsdb
->
mem
->
pHashIdx
,
&
pCheckInfo
->
tableId
,
sizeof
(
pCheckInfo
->
tableId
)
);
tsdbGetTbDataFromMemTable
(
pHandle
->
pTsdb
->
mem
,
pCheckInfo
->
suid
,
pCheckInfo
->
tableId
,
&
pMem
);
if
(
pMem
!=
NULL
)
{
pCheckInfo
->
iter
=
tSkipListCreateIterFromVal
((
*
pMem
)
->
pData
,
(
const
char
*
)
&
tLastKey
,
TSDB_DATA_TYPE_TIMESTAMP
,
order
);
tsdbTbDataIterCreate
(
pMem
,
&
(
TSDBKEY
){.
version
=
0
,
.
ts
=
tLastKey
},
backward
,
&
pCheckInfo
->
iter
);
}
}
if
(
pHandle
->
pTsdb
->
imem
!=
NULL
)
{
pIMem
=
taosHashGet
(
pHandle
->
pTsdb
->
imem
->
pHashIdx
,
&
pCheckInfo
->
tableId
,
sizeof
(
pCheckInfo
->
tableId
)
);
tsdbGetTbDataFromMemTable
(
pHandle
->
pTsdb
->
mem
,
pCheckInfo
->
suid
,
pCheckInfo
->
tableId
,
&
pIMem
);
if
(
pIMem
!=
NULL
)
{
pCheckInfo
->
iiter
=
tSkipListCreateIterFromVal
((
*
pIMem
)
->
pData
,
(
const
char
*
)
&
tLastKey
,
TSDB_DATA_TYPE_TIMESTAMP
,
order
);
tsdbTbDataIterCreate
(
pIMem
,
&
(
TSDBKEY
){.
version
=
0
,
.
ts
=
tLastKey
},
backward
,
&
pCheckInfo
->
iiter
);
}
}
...
...
@@ -777,22 +780,23 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
return
false
;
}
bool
memEmpty
=
(
pCheckInfo
->
iter
==
NULL
)
||
(
pCheckInfo
->
iter
!=
NULL
&&
!
tSkipListIterNext
(
pCheckInfo
->
iter
));
bool
imemEmpty
=
(
pCheckInfo
->
iiter
==
NULL
)
||
(
pCheckInfo
->
iiter
!=
NULL
&&
!
tSkipListIterNext
(
pCheckInfo
->
iiter
));
bool
memEmpty
=
(
pCheckInfo
->
iter
==
NULL
)
||
(
pCheckInfo
->
iter
!=
NULL
&&
!
tsdbTbDataIterGet
(
pCheckInfo
->
iter
,
NULL
));
bool
imemEmpty
=
(
pCheckInfo
->
iiter
==
NULL
)
||
(
pCheckInfo
->
iiter
!=
NULL
&&
!
tsdbTbDataIterGet
(
pCheckInfo
->
iiter
,
NULL
));
if
(
memEmpty
&&
imemEmpty
)
{
// buffer is empty
return
false
;
}
if
(
!
memEmpty
)
{
SSkipListNode
*
node
=
tSkipListIterGet
(
pCheckInfo
->
iter
);
assert
(
node
!=
NULL
);
TSDBROW
row
;
STSRow
*
row
=
(
STSRow
*
)
SL_GET_NODE_DATA
(
node
);
TSKEY
key
=
TD_ROW_KEY
(
row
)
;
// first timestamp in buffer
tsdbTbDataIterGet
(
pCheckInfo
->
iter
,
&
row
);
TSKEY
key
=
row
.
pTSRow
->
ts
;
// first timestamp in buffer
tsdbDebug
(
"%p uid:%"
PRId64
", check data in mem from skey:%"
PRId64
", order:%d, ts range in buf:%"
PRId64
"-%"
PRId64
", lastKey:%"
PRId64
", numOfRows:%"
PRId64
", %s"
,
pHandle
,
pCheckInfo
->
tableId
,
key
,
order
,
(
*
pMem
)
->
keyMin
,
(
*
pMem
)
->
keyMax
,
pCheckInfo
->
lastKey
,
(
*
pMem
)
->
nrows
,
pHandle
->
idStr
);
pHandle
,
pCheckInfo
->
tableId
,
key
,
order
,
pMem
->
minKey
.
ts
,
pMem
->
maxKey
.
ts
,
pCheckInfo
->
lastKey
,
pMem
->
sl
.
size
,
pHandle
->
idStr
);
if
(
ASCENDING_TRAVERSE
(
order
))
{
assert
(
pCheckInfo
->
lastKey
<=
key
);
...
...
@@ -805,15 +809,14 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
}
if
(
!
imemEmpty
)
{
SSkipListNode
*
node
=
tSkipListIterGet
(
pCheckInfo
->
iiter
);
assert
(
node
!=
NULL
);
TSDBROW
row
;
STSRow
*
row
=
(
STSRow
*
)
SL_GET_NODE_DATA
(
node
);
TSKEY
key
=
TD_ROW_KEY
(
row
)
;
// first timestamp in buffer
tsdbTbDataIterGet
(
pCheckInfo
->
iter
,
&
row
);
TSKEY
key
=
row
.
pTSRow
->
ts
;
// first timestamp in buffer
tsdbDebug
(
"%p uid:%"
PRId64
", check data in imem from skey:%"
PRId64
", order:%d, ts range in buf:%"
PRId64
"-%"
PRId64
", lastKey:%"
PRId64
", numOfRows:%"
PRId64
", %s"
,
pHandle
,
pCheckInfo
->
tableId
,
key
,
order
,
(
*
pIMem
)
->
keyMin
,
(
*
pIMem
)
->
keyMax
,
pCheckInfo
->
lastKey
,
(
*
pIMem
)
->
nrows
,
pHandle
->
idStr
);
pHandle
,
pCheckInfo
->
tableId
,
key
,
order
,
pIMem
->
minKey
.
ts
,
pIMem
->
maxKey
.
ts
,
pCheckInfo
->
lastKey
,
pIMem
->
sl
.
size
,
pHandle
->
idStr
);
if
(
ASCENDING_TRAVERSE
(
order
))
{
assert
(
pCheckInfo
->
lastKey
<=
key
);
...
...
@@ -828,31 +831,23 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
}
static
void
destroyTableMemIterator
(
STableCheckInfo
*
pCheckInfo
)
{
t
SkipListDestroyIter
(
pCheckInfo
->
iter
);
t
SkipListDestroyIter
(
pCheckInfo
->
iiter
);
t
sdbTbDataIterDestroy
(
pCheckInfo
->
iter
);
t
sdbTbDataIterDestroy
(
pCheckInfo
->
iiter
);
}
static
TSKEY
extractFirstTraverseKey
(
STableCheckInfo
*
pCheckInfo
,
int32_t
order
,
int32_t
update
,
TDRowVerT
maxVer
)
{
TSDBROW
row
=
{
0
};
STSRow
*
rmem
=
NULL
,
*
rimem
=
NULL
;
if
(
pCheckInfo
->
iter
)
{
SSkipListNode
*
node
=
tSkipListIterGet
(
pCheckInfo
->
iter
);
if
(
node
!=
NULL
)
{
rmem
=
(
STSRow
*
)
SL_GET_NODE_DATA
(
node
);
// TODO: filter max version
// if (TD_ROW_VER(rmem) > maxVer) {
// rmem = NULL;
// }
if
(
tsdbTbDataIterGet
(
pCheckInfo
->
iter
,
&
row
))
{
rmem
=
row
.
pTSRow
;
}
}
if
(
pCheckInfo
->
iiter
)
{
SSkipListNode
*
node
=
tSkipListIterGet
(
pCheckInfo
->
iiter
);
if
(
node
!=
NULL
)
{
rimem
=
(
STSRow
*
)
SL_GET_NODE_DATA
(
node
);
// TODO: filter max version
// if (TD_ROW_VER(rimem) > maxVer) {
// rimem = NULL;
// }
if
(
tsdbTbDataIterGet
(
pCheckInfo
->
iiter
,
&
row
))
{
rimem
=
row
.
pTSRow
;
}
}
...
...
@@ -889,7 +884,7 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
pCheckInfo
->
chosen
=
CHECKINFO_CHOSEN_BOTH
;
}
else
{
pCheckInfo
->
chosen
=
CHECKINFO_CHOSEN_IMEM
;
t
SkipList
IterNext
(
pCheckInfo
->
iter
);
t
sdbTbData
IterNext
(
pCheckInfo
->
iter
);
}
return
r1
;
}
else
if
(
r1
<
r2
&&
ASCENDING_TRAVERSE
(
order
))
{
...
...
@@ -903,28 +898,17 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
static
STSRow
*
getSRowInTableMem
(
STableCheckInfo
*
pCheckInfo
,
int32_t
order
,
int32_t
update
,
STSRow
**
extraRow
,
TDRowVerT
maxVer
)
{
TSDBROW
row
;
STSRow
*
rmem
=
NULL
,
*
rimem
=
NULL
;
if
(
pCheckInfo
->
iter
)
{
SSkipListNode
*
node
=
tSkipListIterGet
(
pCheckInfo
->
iter
);
if
(
node
!=
NULL
)
{
rmem
=
(
STSRow
*
)
SL_GET_NODE_DATA
(
node
);
#if 0 // TODO: skiplist refactor
if (TD_ROW_VER(rmem) > maxVer) {
rmem = NULL;
}
#endif
if
(
tsdbTbDataIterGet
(
pCheckInfo
->
iter
,
&
row
))
{
rmem
=
row
.
pTSRow
;
}
}
if
(
pCheckInfo
->
iiter
)
{
SSkipListNode
*
node
=
tSkipListIterGet
(
pCheckInfo
->
iiter
);
if
(
node
!=
NULL
)
{
rimem
=
(
STSRow
*
)
SL_GET_NODE_DATA
(
node
);
#if 0 // TODO: skiplist refactor
if (TD_ROW_VER(rimem) > maxVer) {
rimem = NULL;
}
#endif
if
(
tsdbTbDataIterGet
(
pCheckInfo
->
iiter
,
&
row
))
{
rimem
=
row
.
pTSRow
;
}
}
...
...
@@ -966,7 +950,7 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int
*
extraRow
=
rimem
;
return
rmem
;
}
else
{
t
SkipList
IterNext
(
pCheckInfo
->
iter
);
t
sdbTbData
IterNext
(
pCheckInfo
->
iter
);
pCheckInfo
->
chosen
=
CHECKINFO_CHOSEN_IMEM
;
return
rimem
;
}
...
...
@@ -995,7 +979,7 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
bool
hasNext
=
false
;
if
(
pCheckInfo
->
chosen
==
CHECKINFO_CHOSEN_MEM
)
{
if
(
pCheckInfo
->
iter
!=
NULL
)
{
hasNext
=
t
SkipList
IterNext
(
pCheckInfo
->
iter
);
hasNext
=
t
sdbTbData
IterNext
(
pCheckInfo
->
iter
);
}
if
(
hasNext
)
{
...
...
@@ -1003,11 +987,11 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
}
if
(
pCheckInfo
->
iiter
!=
NULL
)
{
return
t
SkipListIterGet
(
pCheckInfo
->
iiter
)
!=
NULL
;
return
t
sdbTbDataIterGet
(
pCheckInfo
->
iiter
,
NULL
)
;
}
}
else
if
(
pCheckInfo
->
chosen
==
CHECKINFO_CHOSEN_IMEM
)
{
if
(
pCheckInfo
->
iiter
!=
NULL
)
{
hasNext
=
t
SkipList
IterNext
(
pCheckInfo
->
iiter
);
hasNext
=
t
sdbTbData
IterNext
(
pCheckInfo
->
iiter
);
}
if
(
hasNext
)
{
...
...
@@ -1015,14 +999,14 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
}
if
(
pCheckInfo
->
iter
!=
NULL
)
{
return
t
SkipListIterGet
(
pCheckInfo
->
iter
)
!=
NULL
;
return
t
sdbTbDataIterGet
(
pCheckInfo
->
iter
,
NULL
)
;
}
}
else
{
if
(
pCheckInfo
->
iter
!=
NULL
)
{
hasNext
=
t
SkipList
IterNext
(
pCheckInfo
->
iter
);
hasNext
=
t
sdbTbData
IterNext
(
pCheckInfo
->
iter
);
}
if
(
pCheckInfo
->
iiter
!=
NULL
)
{
hasNext
=
t
SkipList
IterNext
(
pCheckInfo
->
iiter
)
||
hasNext
;
hasNext
=
t
sdbTbData
IterNext
(
pCheckInfo
->
iiter
)
||
hasNext
;
}
}
...
...
@@ -1126,7 +1110,7 @@ static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, in
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pTsdbReadHandle
->
pTableCheckInfo
,
index
);
pCheckInfo
->
numOfBlocks
=
0
;
STable
table
=
{.
uid
=
pCheckInfo
->
tableId
,
.
tid
=
pCheckInfo
->
tableI
d
};
STable
table
=
{.
uid
=
pCheckInfo
->
tableId
,
.
suid
=
pCheckInfo
->
sui
d
};
table
.
pSchema
=
pTsdbReadHandle
->
pSchema
;
if
(
tsdbSetReadTable
(
&
pTsdbReadHandle
->
rhelper
,
&
table
)
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2924,7 +2908,7 @@ static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) {
// current result is empty
if
(
pTsdbReadHandle
->
currentLoadExternalRows
&&
pTsdbReadHandle
->
window
.
skey
==
pTsdbReadHandle
->
window
.
ekey
&&
pTsdbReadHandle
->
cur
.
rows
==
0
)
{
// S
Tsdb
MemTable* pMemRef = pTsdbReadHandle->pMemTable;
// SMemTable* pMemRef = pTsdbReadHandle->pMemTable;
// doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef);
// doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef);
...
...
@@ -3222,7 +3206,7 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) {
}
}
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, S
Tsdb
MemTable* pMemRef) {
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, SMemTable* pMemRef) {
// STsdbReadHandle* pSecQueryHandle = NULL;
//
// if (type == TSDB_PREV_ROW && pTsdbReadHandle->prev) {
...
...
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
浏览文件 @
84aac9e3
...
...
@@ -156,6 +156,24 @@ int tsdbLoadBlockIdx(SReadH *pReadh) {
return
0
;
}
static
int32_t
tsdbBlockIdxCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
SBlockIdx
*
pBlockIdx1
=
(
SBlockIdx
*
)
p1
;
SBlockIdx
*
pBlockIdx2
=
(
SBlockIdx
*
)
p2
;
if
(
pBlockIdx1
->
suid
<
pBlockIdx2
->
suid
)
{
return
-
1
;
}
else
if
(
pBlockIdx1
->
suid
>
pBlockIdx2
->
suid
)
{
return
1
;
}
if
(
pBlockIdx1
->
uid
<
pBlockIdx2
->
uid
)
{
return
-
1
;
}
else
if
(
pBlockIdx1
->
uid
>
pBlockIdx2
->
uid
)
{
return
1
;
}
return
0
;
}
int
tsdbSetReadTable
(
SReadH
*
pReadh
,
STable
*
pTable
)
{
STSchema
*
pSchema
=
tsdbGetTableSchemaImpl
(
TSDB_READ_REPO
(
pReadh
),
pTable
,
false
,
false
,
-
1
);
...
...
@@ -171,33 +189,40 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
return
-
1
;
}
size_t
size
=
taosArrayGetSize
(
pReadh
->
aBlkIdx
);
if
(
size
>
0
)
{
while
(
true
)
{
if
(
pReadh
->
cidx
>=
size
)
{
pReadh
->
pBlkIdx
=
NULL
;
break
;
}
SBlockIdx
*
pBlkIdx
=
taosArrayGet
(
pReadh
->
aBlkIdx
,
pReadh
->
cidx
);
if
(
pBlkIdx
->
uid
==
TABLE_TID
(
pTable
))
{
if
(
pBlkIdx
->
uid
==
TABLE_UID
(
pTable
))
{
pReadh
->
pBlkIdx
=
pBlkIdx
;
}
else
{
pReadh
->
pBlkIdx
=
NULL
;
}
pReadh
->
cidx
++
;
break
;
}
else
if
(
pBlkIdx
->
uid
>
TABLE_TID
(
pTable
))
{
pReadh
->
pBlkIdx
=
NULL
;
break
;
}
else
{
pReadh
->
cidx
++
;
}
}
}
else
{
uint8_t
*
p
=
taosArraySearch
(
pReadh
->
aBlkIdx
,
&
(
SBlockIdx
){.
suid
=
pTable
->
suid
,
.
uid
=
pTable
->
uid
},
tsdbBlockIdxCmprFn
,
TD_EQ
);
if
(
p
==
NULL
)
{
pReadh
->
pBlkIdx
=
NULL
;
}
}
else
{
pReadh
->
pBlkIdx
=
(
SBlockIdx
*
)
p
;
}
// size_t size = taosArrayGetSize(pReadh->aBlkIdx);
// if (size > 0) {
// while (true) {
// if (pReadh->cidx >= size) {
// pReadh->pBlkIdx = NULL;
// break;
// }
// SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
// if (pBlkIdx->uid == TABLE_TID(pTable)) {
// if (pBlkIdx->uid == TABLE_UID(pTable)) {
// pReadh->pBlkIdx = pBlkIdx;
// } else {
// pReadh->pBlkIdx = NULL;
// }
// pReadh->cidx++;
// break;
// } else if (pBlkIdx->uid > TABLE_TID(pTable)) {
// pReadh->pBlkIdx = NULL;
// break;
// } else {
// pReadh->cidx++;
// }
// }
// } else {
// pReadh->pBlkIdx = NULL;
// }
return
0
;
}
...
...
@@ -553,12 +578,12 @@ static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) {
int
tsdbEncodeSBlockIdx
(
void
**
buf
,
SBlockIdx
*
pIdx
)
{
int
tlen
=
0
;
// tlen += taosEncodeVariantI32(buf, pIdx->tid);
tlen
+=
taosEncodeFixedU64
(
buf
,
pIdx
->
suid
);
tlen
+=
taosEncodeFixedU64
(
buf
,
pIdx
->
uid
);
tlen
+=
taosEncodeVariantU32
(
buf
,
pIdx
->
len
);
tlen
+=
taosEncodeVariantU32
(
buf
,
pIdx
->
offset
);
tlen
+=
taosEncodeFixedU8
(
buf
,
pIdx
->
hasLast
);
tlen
+=
taosEncodeVariantU32
(
buf
,
pIdx
->
numOfBlocks
);
tlen
+=
taosEncodeFixedU64
(
buf
,
pIdx
->
uid
);
tlen
+=
taosEncodeFixedU64
(
buf
,
pIdx
->
maxKey
.
ts
);
return
tlen
;
...
...
@@ -570,6 +595,10 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
uint64_t
value
=
0
;
// if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL;
if
((
buf
=
taosDecodeFixedU64
(
buf
,
&
value
))
==
NULL
)
return
NULL
;
pIdx
->
suid
=
(
int64_t
)
value
;
if
((
buf
=
taosDecodeFixedU64
(
buf
,
&
value
))
==
NULL
)
return
NULL
;
pIdx
->
uid
=
(
int64_t
)
value
;
if
((
buf
=
taosDecodeVariantU32
(
buf
,
&
(
pIdx
->
len
)))
==
NULL
)
return
NULL
;
if
((
buf
=
taosDecodeVariantU32
(
buf
,
&
(
pIdx
->
offset
)))
==
NULL
)
return
NULL
;
if
((
buf
=
taosDecodeFixedU8
(
buf
,
&
(
hasLast
)))
==
NULL
)
return
NULL
;
...
...
@@ -577,8 +606,6 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
if
((
buf
=
taosDecodeVariantU32
(
buf
,
&
(
numOfBlocks
)))
==
NULL
)
return
NULL
;
pIdx
->
numOfBlocks
=
numOfBlocks
;
if
((
buf
=
taosDecodeFixedU64
(
buf
,
&
value
))
==
NULL
)
return
NULL
;
pIdx
->
uid
=
(
int64_t
)
value
;
if
((
buf
=
taosDecodeFixedU64
(
buf
,
&
value
))
==
NULL
)
return
NULL
;
pIdx
->
maxKey
.
ts
=
(
TSKEY
)
value
;
return
buf
;
...
...
source/dnode/vnode/src/tsdb/tsdbWrite.c
浏览文件 @
84aac9e3
...
...
@@ -39,7 +39,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
SSubmitBlkRsp
r
=
{
0
};
tGetSubmitMsgNext
(
&
msgIter
,
&
pBlock
);
if
(
pBlock
==
NULL
)
break
;
if
(
tsdbInsertTableData
(
pTsdb
,
&
msgIter
,
pBlock
,
&
r
)
<
0
)
{
if
(
tsdbInsertTableData
(
pTsdb
,
version
,
&
msgIter
,
pBlock
,
&
r
)
<
0
)
{
return
-
1
;
}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
84aac9e3
...
...
@@ -799,7 +799,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
sprintf
(
submitBlkRsp
.
tblFName
,
"%s."
,
pVnode
->
config
.
dbname
);
}
if
(
tsdbInsertTableData
(
pVnode
->
pTsdb
,
&
msgIter
,
pBlock
,
&
submitBlkRsp
)
<
0
)
{
if
(
tsdbInsertTableData
(
pVnode
->
pTsdb
,
version
,
&
msgIter
,
pBlock
,
&
submitBlkRsp
)
<
0
)
{
submitBlkRsp
.
code
=
terrno
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
84aac9e3
...
...
@@ -4760,6 +4760,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
pCond
->
numOfTWindows
=
1
;
pCond
->
twindows
=
taosMemoryCalloc
(
pCond
->
numOfTWindows
,
sizeof
(
STimeWindow
));
pCond
->
twindows
[
0
]
=
pTableScanNode
->
scanRange
;
pCond
->
suid
=
pTableScanNode
->
scan
.
suid
;
#if 1
// todo work around a problem, remove it later
...
...
source/libs/parser/inc/parInsertData.h
浏览文件 @
84aac9e3
...
...
@@ -116,8 +116,7 @@ static FORCE_INLINE void getSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo
}
static
FORCE_INLINE
int32_t
setBlockInfo
(
SSubmitBlk
*
pBlocks
,
STableDataBlocks
*
dataBuf
,
int32_t
numOfRows
)
{
pBlocks
->
suid
=
(
TSDB_NORMAL_TABLE
==
dataBuf
->
pTableMeta
->
tableType
?
dataBuf
->
pTableMeta
->
uid
:
dataBuf
->
pTableMeta
->
suid
);
pBlocks
->
suid
=
(
TSDB_NORMAL_TABLE
==
dataBuf
->
pTableMeta
->
tableType
?
0
:
dataBuf
->
pTableMeta
->
suid
);
pBlocks
->
uid
=
dataBuf
->
pTableMeta
->
uid
;
pBlocks
->
sversion
=
dataBuf
->
pTableMeta
->
sversion
;
pBlocks
->
schemaLen
=
dataBuf
->
createTbReqLen
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录