Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ef0036e3
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
ef0036e3
编写于
5月 23, 2022
作者:
L
Liu Jicong
提交者:
GitHub
5月 23, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12872 from taosdata/feature/tq
refacotr: remove tq meta store
上级
06896e94
930b8500
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
1 addition
and
1194 deletion
+1
-1194
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+0
-1
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+0
-163
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+1
-129
source/dnode/vnode/src/tq/tqMetaStore.c
source/dnode/vnode/src/tq/tqMetaStore.c
+0
-622
source/dnode/vnode/test/tqMetaTest.cpp
source/dnode/vnode/test/tqMetaTest.cpp
+0
-279
未找到文件。
source/dnode/vnode/CMakeLists.txt
浏览文件 @
ef0036e3
...
@@ -48,7 +48,6 @@ target_sources(
...
@@ -48,7 +48,6 @@ target_sources(
# tq
# tq
"src/tq/tq.c"
"src/tq/tq.c"
"src/tq/tqCommit.c"
"src/tq/tqCommit.c"
"src/tq/tqMetaStore.c"
"src/tq/tqOffset.c"
"src/tq/tqOffset.c"
"src/tq/tqPush.c"
"src/tq/tqPush.c"
"src/tq/tqRead.c"
"src/tq/tqRead.c"
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
ef0036e3
...
@@ -41,45 +41,6 @@ extern "C" {
...
@@ -41,45 +41,6 @@ extern "C" {
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
// clang-format on
#define TQ_BUFFER_SIZE 4
#define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256
#define TQ_PAGE_SIZE 4096
// key + offset + size
#define TQ_IDX_SIZE 24
// 4096 / 24
#define TQ_MAX_IDX_ONE_PAGE 170
// 24 * 170
#define TQ_IDX_PAGE_BODY_SIZE 4080
// 4096 - 4080
#define TQ_IDX_PAGE_HEAD_SIZE 16
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
#define TQ_ACTION_INUSE_CONT 2
#define TQ_ACTION_INTXN 3
#define TQ_SVER 0
// TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1
#define TQ_DUP_INTXN_REWRITE 0
#define TQ_DUP_INTXN_REJECT 2
static
inline
bool
tqUpdateAppend
(
int32_t
tqConfigFlag
)
{
return
tqConfigFlag
&
TQ_UPDATE_APPEND
;
}
static
inline
bool
tqDupIntxnReject
(
int32_t
tqConfigFlag
)
{
return
tqConfigFlag
&
TQ_DUP_INTXN_REJECT
;
}
static
const
int8_t
TQ_CONST_DELETE
=
TQ_ACTION_CONST
;
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
typedef
enum
{
TQ_ITEM_READY
,
TQ_ITEM_PROCESS
,
TQ_ITEM_EMPTY
}
STqItemStatus
;
typedef
struct
STqOffsetCfg
STqOffsetCfg
;
typedef
struct
STqOffsetCfg
STqOffsetCfg
;
typedef
struct
STqOffsetStore
STqOffsetStore
;
typedef
struct
STqOffsetStore
STqOffsetStore
;
...
@@ -98,53 +59,6 @@ struct STqReadHandle {
...
@@ -98,53 +59,6 @@ struct STqReadHandle {
STSchema
*
pSchema
;
STSchema
*
pSchema
;
};
};
typedef
struct
{
int16_t
ver
;
int16_t
action
;
int32_t
checksum
;
int64_t
ssize
;
char
content
[];
}
STqSerializedHead
;
typedef
int32_t
(
*
FTqSerialize
)(
const
void
*
pObj
,
STqSerializedHead
**
ppHead
);
typedef
int32_t
(
*
FTqDeserialize
)(
void
*
self
,
const
STqSerializedHead
*
pHead
,
void
**
ppObj
);
typedef
void
(
*
FTqDelete
)(
void
*
);
typedef
struct
{
int64_t
key
;
int64_t
offset
;
int64_t
serializedSize
;
void
*
valueInUse
;
void
*
valueInTxn
;
}
STqMetaHandle
;
typedef
struct
STqMetaList
{
STqMetaHandle
handle
;
struct
STqMetaList
*
next
;
// struct STqMetaList* inTxnPrev;
// struct STqMetaList* inTxnNext;
struct
STqMetaList
*
unpersistPrev
;
struct
STqMetaList
*
unpersistNext
;
}
STqMetaList
;
typedef
struct
{
STQ
*
pTq
;
STqMetaList
*
bucket
[
TQ_BUCKET_SIZE
];
// a table head
STqMetaList
*
unpersistHead
;
// topics that are not connectted
STqMetaList
*
unconnectTopic
;
TdFilePtr
pFile
;
TdFilePtr
pIdxFile
;
char
*
dirPath
;
int32_t
tqConfigFlag
;
FTqSerialize
pSerializer
;
FTqDeserialize
pDeserializer
;
FTqDelete
pDeleter
;
}
STqMetaStore
;
typedef
struct
{
typedef
struct
{
int64_t
consumerId
;
int64_t
consumerId
;
int32_t
epoch
;
int32_t
epoch
;
...
@@ -189,87 +103,10 @@ typedef struct {
...
@@ -189,87 +103,10 @@ typedef struct {
static
STqMgmt
tqMgmt
;
static
STqMgmt
tqMgmt
;
typedef
struct
{
int8_t
status
;
int64_t
offset
;
qTaskInfo_t
task
;
STqReadHandle
*
pReadHandle
;
}
STqTaskItem
;
// new version
typedef
struct
{
int64_t
firstOffset
;
int64_t
lastOffset
;
STqTaskItem
output
[
TQ_BUFFER_SIZE
];
}
STqBuffer
;
typedef
struct
{
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
char
*
sql
;
char
*
logicalPlan
;
char
*
physicalPlan
;
char
*
qmsg
;
STqBuffer
buffer
;
SWalReadHandle
*
pReadhandle
;
}
STqTopic
;
typedef
struct
{
int64_t
consumerId
;
int32_t
epoch
;
char
cgroup
[
TSDB_TOPIC_FNAME_LEN
];
SArray
*
topics
;
// SArray<STqTopic>
}
STqConsumer
;
typedef
struct
{
int8_t
type
;
int8_t
nodeType
;
int8_t
reserved
[
6
];
int64_t
streamId
;
qTaskInfo_t
task
;
// TODO sync function
}
STqStreamPusher
;
typedef
struct
{
int8_t
inited
;
tmr_h
timer
;
}
STqPushMgmt
;
static
STqPushMgmt
tqPushMgmt
;
// init once
// init once
int
tqInit
();
int
tqInit
();
void
tqCleanUp
();
void
tqCleanUp
();
// open in each vnode
// required by vnode
int32_t
tqSerializeConsumer
(
const
STqConsumer
*
,
STqSerializedHead
**
);
int32_t
tqDeserializeConsumer
(
STQ
*
,
const
STqSerializedHead
*
,
STqConsumer
**
);
static
int
FORCE_INLINE
tqQueryExecuting
(
int32_t
status
)
{
return
status
;
}
// tqMetaStore.h
STqMetaStore
*
tqStoreOpen
(
STQ
*
pTq
,
const
char
*
path
,
FTqSerialize
pSerializer
,
FTqDeserialize
pDeserializer
,
FTqDelete
pDeleter
,
int32_t
tqConfigFlag
);
int32_t
tqStoreClose
(
STqMetaStore
*
);
// int32_t tqStoreDelete(TqMetaStore*);
// int32_t tqStoreCommitAll(TqMetaStore*);
int32_t
tqStorePersist
(
STqMetaStore
*
);
// clean deleted idx and data from persistent file
int32_t
tqStoreCompact
(
STqMetaStore
*
);
void
*
tqHandleGet
(
STqMetaStore
*
,
int64_t
key
);
// make it unpersist
void
*
tqHandleTouchGet
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleMovePut
(
STqMetaStore
*
,
int64_t
key
,
void
*
value
);
int32_t
tqHandleCopyPut
(
STqMetaStore
*
,
int64_t
key
,
void
*
value
,
size_t
vsize
);
// delete committed kv pair
// notice that a delete action still needs to be committed
int32_t
tqHandleDel
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandlePurge
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleCommit
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleAbort
(
STqMetaStore
*
,
int64_t
key
);
// tqOffset
// tqOffset
STqOffsetStore
*
STqOffsetOpen
(
STqOffsetCfg
*
);
STqOffsetStore
*
STqOffsetOpen
(
STqOffsetCfg
*
);
void
STqOffsetClose
(
STqOffsetStore
*
);
void
STqOffsetClose
(
STqOffsetStore
*
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
ef0036e3
...
@@ -214,7 +214,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
...
@@ -214,7 +214,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
if
(
taosHashGetSize
(
pTq
->
pStreamTasks
)
==
0
)
return
0
;
if
(
taosHashGetSize
(
pTq
->
pStreamTasks
)
==
0
)
return
0
;
if
(
tdUpdateExpireWindow
(
pTq
->
pVnode
->
pSma
,
msg
,
ver
)
!=
0
)
{
if
(
tdUpdateExpireWindow
(
pTq
->
pVnode
->
pSma
,
msg
,
ver
)
!=
0
)
{
// TODO
error handle
// TODO
handle sma error
}
}
void
*
data
=
taosMemoryMalloc
(
msgLen
);
void
*
data
=
taosMemoryMalloc
(
msgLen
);
if
(
data
==
NULL
)
{
if
(
data
==
NULL
)
{
...
@@ -230,134 +230,6 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
...
@@ -230,134 +230,6 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
int
tqCommit
(
STQ
*
pTq
)
{
int
tqCommit
(
STQ
*
pTq
)
{
// do nothing
// do nothing
/*return tqStorePersist(pTq->tqMeta);*/
return
0
;
}
int32_t
tqGetTopicHandleSize
(
const
STqTopic
*
pTopic
)
{
return
strlen
(
pTopic
->
topicName
)
+
strlen
(
pTopic
->
sql
)
+
strlen
(
pTopic
->
physicalPlan
)
+
strlen
(
pTopic
->
qmsg
)
+
sizeof
(
int64_t
)
*
3
;
}
int32_t
tqGetConsumerHandleSize
(
const
STqConsumer
*
pConsumer
)
{
int
num
=
taosArrayGetSize
(
pConsumer
->
topics
);
int32_t
sz
=
0
;
for
(
int
i
=
0
;
i
<
num
;
i
++
)
{
STqTopic
*
pTopic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
sz
+=
tqGetTopicHandleSize
(
pTopic
);
}
return
sz
;
}
static
FORCE_INLINE
int32_t
tEncodeSTqTopic
(
void
**
buf
,
const
STqTopic
*
pTopic
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pTopic
->
topicName
);
/*tlen += taosEncodeString(buf, pTopic->sql);*/
/*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/
tlen
+=
taosEncodeString
(
buf
,
pTopic
->
qmsg
);
/*tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);*/
/*tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);*/
/*tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);*/
return
tlen
;
}
static
FORCE_INLINE
const
void
*
tDecodeSTqTopic
(
const
void
*
buf
,
STqTopic
*
pTopic
)
{
buf
=
taosDecodeStringTo
(
buf
,
pTopic
->
topicName
);
/*buf = taosDecodeString(buf, &pTopic->sql);*/
/*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/
buf
=
taosDecodeString
(
buf
,
&
pTopic
->
qmsg
);
/*buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);*/
/*buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);*/
/*buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);*/
return
buf
;
}
static
FORCE_INLINE
int32_t
tEncodeSTqConsumer
(
void
**
buf
,
const
STqConsumer
*
pConsumer
)
{
int32_t
sz
;
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
consumerId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumer
->
epoch
);
tlen
+=
taosEncodeString
(
buf
,
pConsumer
->
cgroup
);
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
STqTopic
*
pTopic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
tlen
+=
tEncodeSTqTopic
(
buf
,
pTopic
);
}
return
tlen
;
}
static
FORCE_INLINE
const
void
*
tDecodeSTqConsumer
(
const
void
*
buf
,
STqConsumer
*
pConsumer
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumer
->
epoch
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumer
->
cgroup
);
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumer
->
topics
=
taosArrayInit
(
sz
,
sizeof
(
STqTopic
));
if
(
pConsumer
->
topics
==
NULL
)
return
NULL
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
STqTopic
pTopic
;
buf
=
tDecodeSTqTopic
(
buf
,
&
pTopic
);
taosArrayPush
(
pConsumer
->
topics
,
&
pTopic
);
}
return
buf
;
}
int
tqSerializeConsumer
(
const
STqConsumer
*
pConsumer
,
STqSerializedHead
**
ppHead
)
{
int32_t
sz
=
tEncodeSTqConsumer
(
NULL
,
pConsumer
);
if
(
sz
>
(
*
ppHead
)
->
ssize
)
{
void
*
tmpPtr
=
taosMemoryRealloc
(
*
ppHead
,
sizeof
(
STqSerializedHead
)
+
sz
);
if
(
tmpPtr
==
NULL
)
{
taosMemoryFree
(
*
ppHead
);
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
*
ppHead
=
tmpPtr
;
(
*
ppHead
)
->
ssize
=
sz
;
}
void
*
ptr
=
(
*
ppHead
)
->
content
;
void
*
abuf
=
ptr
;
tEncodeSTqConsumer
(
&
abuf
,
pConsumer
);
return
0
;
}
int32_t
tqDeserializeConsumer
(
STQ
*
pTq
,
const
STqSerializedHead
*
pHead
,
STqConsumer
**
ppConsumer
)
{
const
void
*
str
=
pHead
->
content
;
*
ppConsumer
=
taosMemoryCalloc
(
1
,
sizeof
(
STqConsumer
));
if
(
*
ppConsumer
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
if
(
tDecodeSTqConsumer
(
str
,
*
ppConsumer
)
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
STqConsumer
*
pConsumer
=
*
ppConsumer
;
int32_t
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
STqTopic
*
pTopic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
pTopic
->
pReadhandle
=
walOpenReadHandle
(
pTq
->
pWal
);
if
(
pTopic
->
pReadhandle
==
NULL
)
{
ASSERT
(
false
);
}
for
(
int
j
=
0
;
j
<
TQ_BUFFER_SIZE
;
j
++
)
{
pTopic
->
buffer
.
output
[
j
].
status
=
0
;
STqReadHandle
*
pReadHandle
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
SReadHandle
handle
=
{
.
reader
=
pReadHandle
,
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
pTopic
->
buffer
.
output
[
j
].
pReadHandle
=
pReadHandle
;
pTopic
->
buffer
.
output
[
j
].
task
=
qCreateStreamExecTaskInfo
(
pTopic
->
qmsg
,
&
handle
);
}
}
return
0
;
return
0
;
}
}
...
...
source/dnode/vnode/src/tq/tqMetaStore.c
已删除
100644 → 0
浏览文件 @
06896e94
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tq.h"
// #include <fcntl.h>
// #include <string.h>
// #include <unistd.h>
// #include "osDir.h"
#define TQ_META_NAME "tq.meta"
#define TQ_IDX_NAME "tq.idx"
static
int32_t
tqHandlePutCommitted
(
STqMetaStore
*
,
int64_t
key
,
void
*
value
);
static
void
*
tqHandleGetUncommitted
(
STqMetaStore
*
,
int64_t
key
);
static
inline
void
tqLinkUnpersist
(
STqMetaStore
*
pMeta
,
STqMetaList
*
pNode
)
{
if
(
pNode
->
unpersistNext
==
NULL
)
{
pNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
pNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
pMeta
->
unpersistHead
->
unpersistNext
->
unpersistPrev
=
pNode
;
pMeta
->
unpersistHead
->
unpersistNext
=
pNode
;
}
}
static
inline
int64_t
tqSeekLastPage
(
TdFilePtr
pFile
)
{
int
offset
=
taosLSeekFile
(
pFile
,
0
,
SEEK_END
);
int
pageNo
=
offset
/
TQ_PAGE_SIZE
;
int
curPageOffset
=
pageNo
*
TQ_PAGE_SIZE
;
return
taosLSeekFile
(
pFile
,
curPageOffset
,
SEEK_SET
);
}
// TODO: the struct is tightly coupled with index entry
typedef
struct
STqIdxPageHead
{
int16_t
writeOffset
;
int8_t
unused
[
14
];
}
STqIdxPageHead
;
typedef
struct
STqIdxPageBuf
{
STqIdxPageHead
head
;
char
buffer
[
TQ_IDX_PAGE_BODY_SIZE
];
}
STqIdxPageBuf
;
static
inline
int
tqReadLastPage
(
TdFilePtr
pFile
,
STqIdxPageBuf
*
pBuf
)
{
int
offset
=
tqSeekLastPage
(
pFile
);
int
nBytes
;
if
((
nBytes
=
taosReadFile
(
pFile
,
pBuf
,
TQ_PAGE_SIZE
))
==
-
1
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
if
(
nBytes
==
0
)
{
memset
(
pBuf
,
0
,
TQ_PAGE_SIZE
);
pBuf
->
head
.
writeOffset
=
TQ_IDX_PAGE_HEAD_SIZE
;
}
ASSERT
(
nBytes
==
0
||
nBytes
==
pBuf
->
head
.
writeOffset
);
return
taosLSeekFile
(
pFile
,
offset
,
SEEK_SET
);
}
STqMetaStore
*
tqStoreOpen
(
STQ
*
pTq
,
const
char
*
path
,
FTqSerialize
serializer
,
FTqDeserialize
deserializer
,
FTqDelete
deleter
,
int32_t
tqConfigFlag
)
{
STqMetaStore
*
pMeta
=
taosMemoryCalloc
(
1
,
sizeof
(
STqMetaStore
));
if
(
pMeta
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
NULL
;
}
pMeta
->
pTq
=
pTq
;
// concat data file name and index file name
size_t
pathLen
=
strlen
(
path
);
pMeta
->
dirPath
=
taosMemoryMalloc
(
pathLen
+
1
);
if
(
pMeta
->
dirPath
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
taosMemoryFree
(
pMeta
);
return
NULL
;
}
strcpy
(
pMeta
->
dirPath
,
path
);
char
*
name
=
taosMemoryMalloc
(
pathLen
+
10
);
strcpy
(
name
,
path
);
if
(
!
taosDirExist
(
name
)
&&
taosMkDir
(
name
)
!=
0
)
{
terrno
=
TSDB_CODE_TQ_FAILED_TO_CREATE_DIR
;
tqError
(
"failed to create dir:%s since %s "
,
name
,
terrstr
());
}
strcat
(
name
,
"/"
TQ_IDX_NAME
);
TdFilePtr
pIdxFile
=
taosOpenFile
(
name
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_READ
);
if
(
pIdxFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tqError
(
"failed to open file:%s since %s "
,
name
,
terrstr
());
// free memory
taosMemoryFree
(
name
);
return
NULL
;
}
pMeta
->
pIdxFile
=
pIdxFile
;
pMeta
->
unpersistHead
=
taosMemoryCalloc
(
1
,
sizeof
(
STqMetaList
));
if
(
pMeta
->
unpersistHead
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
taosMemoryFree
(
name
);
return
NULL
;
}
pMeta
->
unpersistHead
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistPrev
=
pMeta
->
unpersistHead
;
strcpy
(
name
,
path
);
strcat
(
name
,
"/"
TQ_META_NAME
);
TdFilePtr
pFile
=
taosOpenFile
(
name
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_READ
);
if
(
pFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tqError
(
"failed to open file:%s since %s"
,
name
,
terrstr
());
taosMemoryFree
(
name
);
return
NULL
;
}
taosMemoryFree
(
name
);
pMeta
->
pFile
=
pFile
;
pMeta
->
pSerializer
=
serializer
;
pMeta
->
pDeserializer
=
deserializer
;
pMeta
->
pDeleter
=
deleter
;
pMeta
->
tqConfigFlag
=
tqConfigFlag
;
// read idx file and load into memory
STqIdxPageBuf
idxBuf
;
STqSerializedHead
*
serializedObj
=
taosMemoryMalloc
(
TQ_PAGE_SIZE
);
if
(
serializedObj
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
}
int
idxRead
;
int
allocated
=
TQ_PAGE_SIZE
;
bool
readEnd
=
false
;
while
((
idxRead
=
taosReadFile
(
pIdxFile
,
&
idxBuf
,
TQ_PAGE_SIZE
)))
{
if
(
idxRead
==
-
1
)
{
// TODO: handle error
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tqError
(
"failed to read tq index file since %s"
,
terrstr
());
}
ASSERT
(
idxBuf
.
head
.
writeOffset
==
idxRead
);
// loop read every entry
for
(
int
i
=
0
;
i
<
idxBuf
.
head
.
writeOffset
-
TQ_IDX_PAGE_HEAD_SIZE
;
i
+=
TQ_IDX_SIZE
)
{
STqMetaList
*
pNode
=
taosMemoryCalloc
(
1
,
sizeof
(
STqMetaList
));
if
(
pNode
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
// TODO: free memory
}
memcpy
(
&
pNode
->
handle
,
&
idxBuf
.
buffer
[
i
],
TQ_IDX_SIZE
);
taosLSeekFile
(
pFile
,
pNode
->
handle
.
offset
,
SEEK_SET
);
if
(
allocated
<
pNode
->
handle
.
serializedSize
)
{
void
*
ptr
=
taosMemoryRealloc
(
serializedObj
,
pNode
->
handle
.
serializedSize
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
// TODO: free memory
}
serializedObj
=
ptr
;
allocated
=
pNode
->
handle
.
serializedSize
;
}
serializedObj
->
ssize
=
pNode
->
handle
.
serializedSize
;
if
(
taosReadFile
(
pFile
,
serializedObj
,
pNode
->
handle
.
serializedSize
)
!=
pNode
->
handle
.
serializedSize
)
{
// TODO: read error
}
if
(
serializedObj
->
action
==
TQ_ACTION_INUSE
)
{
if
(
serializedObj
->
ssize
!=
sizeof
(
STqSerializedHead
))
{
pMeta
->
pDeserializer
(
pTq
,
serializedObj
,
&
pNode
->
handle
.
valueInUse
);
}
else
{
pNode
->
handle
.
valueInUse
=
TQ_DELETE_TOKEN
;
}
}
else
if
(
serializedObj
->
action
==
TQ_ACTION_INTXN
)
{
if
(
serializedObj
->
ssize
!=
sizeof
(
STqSerializedHead
))
{
pMeta
->
pDeserializer
(
pTq
,
serializedObj
,
&
pNode
->
handle
.
valueInTxn
);
}
else
{
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
}
}
else
if
(
serializedObj
->
action
==
TQ_ACTION_INUSE_CONT
)
{
if
(
serializedObj
->
ssize
!=
sizeof
(
STqSerializedHead
))
{
pMeta
->
pDeserializer
(
pTq
,
serializedObj
,
&
pNode
->
handle
.
valueInUse
);
}
else
{
pNode
->
handle
.
valueInUse
=
TQ_DELETE_TOKEN
;
}
STqSerializedHead
*
ptr
=
POINTER_SHIFT
(
serializedObj
,
serializedObj
->
ssize
);
if
(
ptr
->
ssize
!=
sizeof
(
STqSerializedHead
))
{
pMeta
->
pDeserializer
(
pTq
,
ptr
,
&
pNode
->
handle
.
valueInTxn
);
}
else
{
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
}
}
else
{
ASSERT
(
0
);
}
// put into list
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_MASK
;
STqMetaList
*
pBucketNode
=
pMeta
->
bucket
[
bucketKey
];
if
(
pBucketNode
==
NULL
)
{
pMeta
->
bucket
[
bucketKey
]
=
pNode
;
}
else
if
(
pBucketNode
->
handle
.
key
==
pNode
->
handle
.
key
)
{
pNode
->
next
=
pBucketNode
->
next
;
pMeta
->
bucket
[
bucketKey
]
=
pNode
;
}
else
{
while
(
pBucketNode
->
next
&&
pBucketNode
->
next
->
handle
.
key
!=
pNode
->
handle
.
key
)
{
pBucketNode
=
pBucketNode
->
next
;
}
if
(
pBucketNode
->
next
)
{
ASSERT
(
pBucketNode
->
next
->
handle
.
key
==
pNode
->
handle
.
key
);
STqMetaList
*
pNodeFound
=
pBucketNode
->
next
;
pNode
->
next
=
pNodeFound
->
next
;
pBucketNode
->
next
=
pNode
;
pBucketNode
=
pNodeFound
;
}
else
{
pNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
pMeta
->
bucket
[
bucketKey
]
=
pNode
;
pBucketNode
=
NULL
;
}
}
if
(
pBucketNode
)
{
if
(
pBucketNode
->
handle
.
valueInUse
&&
pBucketNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
pDeleter
(
pBucketNode
->
handle
.
valueInUse
);
}
if
(
pBucketNode
->
handle
.
valueInTxn
&&
pBucketNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
pDeleter
(
pBucketNode
->
handle
.
valueInTxn
);
}
taosMemoryFree
(
pBucketNode
);
}
}
}
taosMemoryFree
(
serializedObj
);
return
pMeta
;
}
int32_t
tqStoreClose
(
STqMetaStore
*
pMeta
)
{
// commit data and idx
tqStorePersist
(
pMeta
);
ASSERT
(
pMeta
->
unpersistHead
&&
pMeta
->
unpersistHead
->
next
==
NULL
);
taosCloseFile
(
&
pMeta
->
pFile
);
taosCloseFile
(
&
pMeta
->
pIdxFile
);
// free memory
for
(
int
i
=
0
;
i
<
TQ_BUCKET_SIZE
;
i
++
)
{
STqMetaList
*
pNode
=
pMeta
->
bucket
[
i
];
while
(
pNode
)
{
ASSERT
(
pNode
->
unpersistNext
==
NULL
);
ASSERT
(
pNode
->
unpersistPrev
==
NULL
);
if
(
pNode
->
handle
.
valueInTxn
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
pDeleter
(
pNode
->
handle
.
valueInTxn
);
}
if
(
pNode
->
handle
.
valueInUse
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
pDeleter
(
pNode
->
handle
.
valueInUse
);
}
STqMetaList
*
next
=
pNode
->
next
;
taosMemoryFree
(
pNode
);
pNode
=
next
;
}
}
taosMemoryFree
(
pMeta
->
dirPath
);
taosMemoryFree
(
pMeta
->
unpersistHead
);
taosMemoryFree
(
pMeta
);
return
0
;
}
int32_t
tqStoreDelete
(
STqMetaStore
*
pMeta
)
{
taosCloseFile
(
&
pMeta
->
pFile
);
taosCloseFile
(
&
pMeta
->
pIdxFile
);
// free memory
for
(
int
i
=
0
;
i
<
TQ_BUCKET_SIZE
;
i
++
)
{
STqMetaList
*
pNode
=
pMeta
->
bucket
[
i
];
pMeta
->
bucket
[
i
]
=
NULL
;
while
(
pNode
)
{
if
(
pNode
->
handle
.
valueInTxn
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
pDeleter
(
pNode
->
handle
.
valueInTxn
);
}
if
(
pNode
->
handle
.
valueInUse
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
pDeleter
(
pNode
->
handle
.
valueInUse
);
}
STqMetaList
*
next
=
pNode
->
next
;
taosMemoryFree
(
pNode
);
pNode
=
next
;
}
}
taosMemoryFree
(
pMeta
->
unpersistHead
);
taosRemoveDir
(
pMeta
->
dirPath
);
taosMemoryFree
(
pMeta
->
dirPath
);
taosMemoryFree
(
pMeta
);
return
0
;
}
int32_t
tqStorePersist
(
STqMetaStore
*
pMeta
)
{
STqIdxPageBuf
idxBuf
;
int64_t
*
bufPtr
=
(
int64_t
*
)
idxBuf
.
buffer
;
STqMetaList
*
pHead
=
pMeta
->
unpersistHead
;
STqMetaList
*
pNode
=
pHead
->
unpersistNext
;
STqSerializedHead
*
pSHead
=
taosMemoryMalloc
(
sizeof
(
STqSerializedHead
));
if
(
pSHead
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
pSHead
->
ver
=
TQ_SVER
;
pSHead
->
checksum
=
0
;
pSHead
->
ssize
=
sizeof
(
STqSerializedHead
);
/*int allocatedSize = sizeof(STqSerializedHead);*/
int
offset
=
taosLSeekFile
(
pMeta
->
pFile
,
0
,
SEEK_CUR
);
tqReadLastPage
(
pMeta
->
pIdxFile
,
&
idxBuf
);
if
(
idxBuf
.
head
.
writeOffset
==
TQ_PAGE_SIZE
)
{
taosLSeekFile
(
pMeta
->
pIdxFile
,
0
,
SEEK_END
);
memset
(
&
idxBuf
,
0
,
TQ_PAGE_SIZE
);
idxBuf
.
head
.
writeOffset
=
TQ_IDX_PAGE_HEAD_SIZE
;
}
else
{
bufPtr
=
POINTER_SHIFT
(
&
idxBuf
,
idxBuf
.
head
.
writeOffset
);
}
while
(
pHead
!=
pNode
)
{
int
nBytes
=
0
;
if
(
pNode
->
handle
.
valueInUse
)
{
if
(
pNode
->
handle
.
valueInTxn
)
{
pSHead
->
action
=
TQ_ACTION_INUSE_CONT
;
}
else
{
pSHead
->
action
=
TQ_ACTION_INUSE
;
}
if
(
pNode
->
handle
.
valueInUse
==
TQ_DELETE_TOKEN
)
{
pSHead
->
ssize
=
sizeof
(
STqSerializedHead
);
}
else
{
pMeta
->
pSerializer
(
pNode
->
handle
.
valueInUse
,
&
pSHead
);
}
nBytes
=
taosWriteFile
(
pMeta
->
pFile
,
pSHead
,
pSHead
->
ssize
);
ASSERT
(
nBytes
==
pSHead
->
ssize
);
}
if
(
pNode
->
handle
.
valueInTxn
)
{
pSHead
->
action
=
TQ_ACTION_INTXN
;
if
(
pNode
->
handle
.
valueInTxn
==
TQ_DELETE_TOKEN
)
{
pSHead
->
ssize
=
sizeof
(
STqSerializedHead
);
}
else
{
pMeta
->
pSerializer
(
pNode
->
handle
.
valueInTxn
,
&
pSHead
);
}
int
nBytesTxn
=
taosWriteFile
(
pMeta
->
pFile
,
pSHead
,
pSHead
->
ssize
);
ASSERT
(
nBytesTxn
==
pSHead
->
ssize
);
nBytes
+=
nBytesTxn
;
}
pNode
->
handle
.
offset
=
offset
;
offset
+=
nBytes
;
// write idx file
// TODO: endian check and convert
*
(
bufPtr
++
)
=
pNode
->
handle
.
key
;
*
(
bufPtr
++
)
=
pNode
->
handle
.
offset
;
*
(
bufPtr
++
)
=
(
int64_t
)
nBytes
;
idxBuf
.
head
.
writeOffset
+=
TQ_IDX_SIZE
;
if
(
idxBuf
.
head
.
writeOffset
>=
TQ_PAGE_SIZE
)
{
nBytes
=
taosWriteFile
(
pMeta
->
pIdxFile
,
&
idxBuf
,
TQ_PAGE_SIZE
);
// TODO: handle error with tfile
ASSERT
(
nBytes
==
TQ_PAGE_SIZE
);
memset
(
&
idxBuf
,
0
,
TQ_PAGE_SIZE
);
idxBuf
.
head
.
writeOffset
=
TQ_IDX_PAGE_HEAD_SIZE
;
bufPtr
=
(
int64_t
*
)
&
idxBuf
.
buffer
;
}
// remove from unpersist list
pHead
->
unpersistNext
=
pNode
->
unpersistNext
;
pHead
->
unpersistNext
->
unpersistPrev
=
pHead
;
pNode
->
unpersistPrev
=
pNode
->
unpersistNext
=
NULL
;
pNode
=
pHead
->
unpersistNext
;
// remove from bucket
if
(
pNode
->
handle
.
valueInUse
==
TQ_DELETE_TOKEN
&&
pNode
->
handle
.
valueInTxn
==
NULL
)
{
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_MASK
;
STqMetaList
*
pBucketHead
=
pMeta
->
bucket
[
bucketKey
];
if
(
pBucketHead
==
pNode
)
{
pMeta
->
bucket
[
bucketKey
]
=
pNode
->
next
;
}
else
{
STqMetaList
*
pBucketNode
=
pBucketHead
;
while
(
pBucketNode
->
next
!=
NULL
&&
pBucketNode
->
next
!=
pNode
)
{
pBucketNode
=
pBucketNode
->
next
;
}
// impossible for pBucket->next == NULL
ASSERT
(
pBucketNode
->
next
==
pNode
);
pBucketNode
->
next
=
pNode
->
next
;
}
taosMemoryFree
(
pNode
);
}
}
// write left bytes
taosMemoryFree
(
pSHead
);
// TODO: write new version in tfile
if
((
char
*
)
bufPtr
!=
idxBuf
.
buffer
)
{
int
nBytes
=
taosWriteFile
(
pMeta
->
pIdxFile
,
&
idxBuf
,
idxBuf
.
head
.
writeOffset
);
// TODO: handle error in tfile
ASSERT
(
nBytes
==
idxBuf
.
head
.
writeOffset
);
}
// TODO: using fsync in tfile
taosFsyncFile
(
pMeta
->
pIdxFile
);
taosFsyncFile
(
pMeta
->
pFile
);
return
0
;
}
static
int32_t
tqHandlePutCommitted
(
STqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
STqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
pDeleter
(
pNode
->
handle
.
valueInUse
);
}
// change pointer ownership
pNode
->
handle
.
valueInUse
=
value
;
return
0
;
}
else
{
pNode
=
pNode
->
next
;
}
}
STqMetaList
*
pNewNode
=
taosMemoryCalloc
(
1
,
sizeof
(
STqMetaList
));
if
(
pNewNode
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInUse
=
value
;
pNewNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
// put into unpersist list
pNewNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
pNewNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
pMeta
->
unpersistHead
->
unpersistNext
->
unpersistPrev
=
pNewNode
;
pMeta
->
unpersistHead
->
unpersistNext
=
pNewNode
;
return
0
;
}
void
*
tqHandleGet
(
STqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
STqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
return
pNode
->
handle
.
valueInUse
;
}
else
{
return
NULL
;
}
}
else
{
pNode
=
pNode
->
next
;
}
}
return
NULL
;
}
void
*
tqHandleTouchGet
(
STqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
STqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
tqLinkUnpersist
(
pMeta
,
pNode
);
return
pNode
->
handle
.
valueInUse
;
}
else
{
return
NULL
;
}
}
else
{
pNode
=
pNode
->
next
;
}
}
return
NULL
;
}
static
inline
int32_t
tqHandlePutImpl
(
STqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
STqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
)
{
if
(
tqDupIntxnReject
(
pMeta
->
tqConfigFlag
))
{
terrno
=
TSDB_CODE_TQ_META_KEY_DUP_IN_TXN
;
return
-
1
;
}
if
(
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
pDeleter
(
pNode
->
handle
.
valueInTxn
);
}
}
pNode
->
handle
.
valueInTxn
=
value
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
else
{
pNode
=
pNode
->
next
;
}
}
STqMetaList
*
pNewNode
=
taosMemoryCalloc
(
1
,
sizeof
(
STqMetaList
));
if
(
pNewNode
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInTxn
=
value
;
pNewNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
pMeta
->
bucket
[
bucketKey
]
=
pNewNode
;
tqLinkUnpersist
(
pMeta
,
pNewNode
);
return
0
;
}
int32_t
tqHandleMovePut
(
STqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
return
tqHandlePutImpl
(
pMeta
,
key
,
value
);
}
int32_t
tqHandleCopyPut
(
STqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
,
size_t
vsize
)
{
void
*
vmem
=
taosMemoryMalloc
(
vsize
);
if
(
vmem
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
memcpy
(
vmem
,
value
,
vsize
);
return
tqHandlePutImpl
(
pMeta
,
key
,
vmem
);
}
static
void
*
tqHandleGetUncommitted
(
STqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
STqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
NULL
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
return
pNode
->
handle
.
valueInTxn
;
}
else
{
return
NULL
;
}
}
else
{
pNode
=
pNode
->
next
;
}
}
return
NULL
;
}
int32_t
tqHandleCommit
(
STqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
STqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_META_KEY_NOT_IN_TXN
;
return
-
1
;
}
if
(
pNode
->
handle
.
valueInUse
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
pDeleter
(
pNode
->
handle
.
valueInUse
);
}
pNode
->
handle
.
valueInUse
=
pNode
->
handle
.
valueInTxn
;
pNode
->
handle
.
valueInTxn
=
NULL
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
else
{
pNode
=
pNode
->
next
;
}
}
terrno
=
TSDB_CODE_TQ_META_NO_SUCH_KEY
;
return
-
1
;
}
int32_t
tqHandleAbort
(
STqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
STqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
pDeleter
(
pNode
->
handle
.
valueInTxn
);
}
pNode
->
handle
.
valueInTxn
=
NULL
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
terrno
=
TSDB_CODE_TQ_META_KEY_NOT_IN_TXN
;
return
-
1
;
}
else
{
pNode
=
pNode
->
next
;
}
}
terrno
=
TSDB_CODE_TQ_META_NO_SUCH_KEY
;
return
-
1
;
}
int32_t
tqHandleDel
(
STqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
STqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
if
(
pNode
->
handle
.
valueInTxn
)
{
pMeta
->
pDeleter
(
pNode
->
handle
.
valueInTxn
);
}
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
}
else
{
pNode
=
pNode
->
next
;
}
}
terrno
=
TSDB_CODE_TQ_META_NO_SUCH_KEY
;
return
-
1
;
}
int32_t
tqHandlePurge
(
STqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
STqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
pNode
->
handle
.
valueInUse
=
TQ_DELETE_TOKEN
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
else
{
pNode
=
pNode
->
next
;
}
}
terrno
=
TSDB_CODE_TQ_META_NO_SUCH_KEY
;
return
-
1
;
}
// TODO: clean deleted idx and data from persistent file
int32_t
tqStoreCompact
(
STqMetaStore
*
pMeta
)
{
return
0
;
}
source/dnode/vnode/test/tqMetaTest.cpp
已删除
100644 → 0
浏览文件 @
06896e94
#include <gtest/gtest.h>
#include <cstring>
#include <iostream>
#include <queue>
#include "tqMetaStore.h"
struct
Foo
{
int32_t
a
;
};
int
FooSerializer
(
const
void
*
pObj
,
STqSerializedHead
**
ppHead
)
{
Foo
*
foo
=
(
Foo
*
)
pObj
;
if
((
*
ppHead
)
==
NULL
||
(
*
ppHead
)
->
ssize
<
sizeof
(
STqSerializedHead
)
+
sizeof
(
int32_t
))
{
*
ppHead
=
(
STqSerializedHead
*
)
taosMemoryRealloc
(
*
ppHead
,
sizeof
(
STqSerializedHead
)
+
sizeof
(
int32_t
));
(
*
ppHead
)
->
ssize
=
sizeof
(
STqSerializedHead
)
+
sizeof
(
int32_t
);
}
*
(
int32_t
*
)(
*
ppHead
)
->
content
=
foo
->
a
;
return
(
*
ppHead
)
->
ssize
;
}
const
void
*
FooDeserializer
(
const
STqSerializedHead
*
pHead
,
void
**
ppObj
)
{
if
(
*
ppObj
==
NULL
)
{
*
ppObj
=
taosMemoryRealloc
(
*
ppObj
,
sizeof
(
int32_t
));
}
Foo
*
pFoo
=
*
(
Foo
**
)
ppObj
;
pFoo
->
a
=
*
(
int32_t
*
)
pHead
->
content
;
return
NULL
;
}
void
FooDeleter
(
void
*
pObj
)
{
taosMemoryFree
(
pObj
);
}
class
TqMetaUpdateAppendTest
:
public
::
testing
::
Test
{
protected:
void
SetUp
()
override
{
taosRemoveDir
(
pathName
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
,
TQ_UPDATE_APPEND
);
ASSERT
(
pMeta
);
}
void
TearDown
()
override
{
tqStoreClose
(
pMeta
);
}
STqMetaStore
*
pMeta
;
const
char
*
pathName
=
TD_TMP_DIR_PATH
"tq_test"
;
};
TEST_F
(
TqMetaUpdateAppendTest
,
copyPutTest
)
{
Foo
foo
;
foo
.
a
=
3
;
tqHandleCopyPut
(
pMeta
,
1
,
&
foo
,
sizeof
(
Foo
));
Foo
*
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
tqHandleCommit
(
pMeta
,
1
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
->
a
,
3
);
}
TEST_F
(
TqMetaUpdateAppendTest
,
persistTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
taosMemoryMalloc
(
sizeof
(
Foo
));
pFoo
->
a
=
2
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
Foo
*
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pBar
==
NULL
,
true
);
tqHandleCommit
(
pMeta
,
1
);
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pBar
->
a
,
pFoo
->
a
);
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
2
);
EXPECT_EQ
(
pBar
==
NULL
,
true
);
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
,
TQ_UPDATE_APPEND
);
ASSERT
(
pMeta
);
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
ASSERT_EQ
(
pBar
!=
NULL
,
true
);
EXPECT_EQ
(
pBar
->
a
,
2
);
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
2
);
EXPECT_EQ
(
pBar
==
NULL
,
true
);
}
TEST_F
(
TqMetaUpdateAppendTest
,
uncommittedTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
taosMemoryMalloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
TEST_F
(
TqMetaUpdateAppendTest
,
abortTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
taosMemoryMalloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
tqHandleAbort
(
pMeta
,
1
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
TEST_F
(
TqMetaUpdateAppendTest
,
deleteTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
taosMemoryMalloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
tqHandleCommit
(
pMeta
,
1
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
);
EXPECT_EQ
(
pFoo
->
a
,
3
);
tqHandleDel
(
pMeta
,
1
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
);
EXPECT_EQ
(
pFoo
->
a
,
3
);
tqHandleCommit
(
pMeta
,
1
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
,
TQ_UPDATE_APPEND
);
ASSERT
(
pMeta
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
TEST_F
(
TqMetaUpdateAppendTest
,
intxnPersist
)
{
Foo
*
pFoo
=
(
Foo
*
)
taosMemoryMalloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
tqHandleCommit
(
pMeta
,
1
);
Foo
*
pBar
=
(
Foo
*
)
taosMemoryMalloc
(
sizeof
(
Foo
));
pBar
->
a
=
4
;
tqHandleMovePut
(
pMeta
,
1
,
pBar
);
Foo
*
pFoo1
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo1
->
a
,
3
);
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
,
TQ_UPDATE_APPEND
);
ASSERT
(
pMeta
);
pFoo1
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo1
->
a
,
3
);
tqHandleCommit
(
pMeta
,
1
);
pFoo1
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo1
->
a
,
4
);
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
,
TQ_UPDATE_APPEND
);
ASSERT
(
pMeta
);
pFoo1
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo1
->
a
,
4
);
}
TEST_F
(
TqMetaUpdateAppendTest
,
multiplePage
)
{
taosSeedRand
(
0
);
std
::
vector
<
int
>
v
;
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
v
.
push_back
(
taosRand
());
Foo
foo
;
foo
.
a
=
v
[
i
];
tqHandleCopyPut
(
pMeta
,
i
,
&
foo
,
sizeof
(
Foo
));
}
for
(
int
i
=
0
;
i
<
500
;
i
++
)
{
tqHandleCommit
(
pMeta
,
i
);
Foo
*
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
i
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
)
<<
" at idx "
<<
i
<<
"
\n
"
;
EXPECT_EQ
(
pFoo
->
a
,
v
[
i
]);
}
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
,
TQ_UPDATE_APPEND
);
ASSERT
(
pMeta
);
for
(
int
i
=
500
;
i
<
1000
;
i
++
)
{
tqHandleCommit
(
pMeta
,
i
);
Foo
*
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
i
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
)
<<
" at idx "
<<
i
<<
"
\n
"
;
EXPECT_EQ
(
pFoo
->
a
,
v
[
i
]);
}
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
Foo
*
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
i
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
)
<<
" at idx "
<<
i
<<
"
\n
"
;
EXPECT_EQ
(
pFoo
->
a
,
v
[
i
]);
}
}
TEST_F
(
TqMetaUpdateAppendTest
,
multipleRewrite
)
{
taosSeedRand
(
0
);
std
::
vector
<
int
>
v
;
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
v
.
push_back
(
taosRand
());
Foo
foo
;
foo
.
a
=
v
[
i
];
tqHandleCopyPut
(
pMeta
,
i
,
&
foo
,
sizeof
(
Foo
));
}
for
(
int
i
=
0
;
i
<
500
;
i
++
)
{
tqHandleCommit
(
pMeta
,
i
);
v
[
i
]
=
taosRand
();
Foo
foo
;
foo
.
a
=
v
[
i
];
tqHandleCopyPut
(
pMeta
,
i
,
&
foo
,
sizeof
(
Foo
));
}
for
(
int
i
=
500
;
i
<
1000
;
i
++
)
{
v
[
i
]
=
taosRand
();
Foo
foo
;
foo
.
a
=
v
[
i
];
tqHandleCopyPut
(
pMeta
,
i
,
&
foo
,
sizeof
(
Foo
));
}
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
tqHandleCommit
(
pMeta
,
i
);
}
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
,
TQ_UPDATE_APPEND
);
ASSERT
(
pMeta
);
for
(
int
i
=
500
;
i
<
1000
;
i
++
)
{
v
[
i
]
=
taosRand
();
Foo
foo
;
foo
.
a
=
v
[
i
];
tqHandleCopyPut
(
pMeta
,
i
,
&
foo
,
sizeof
(
Foo
));
tqHandleCommit
(
pMeta
,
i
);
}
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
Foo
*
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
i
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
)
<<
" at idx "
<<
i
<<
"
\n
"
;
EXPECT_EQ
(
pFoo
->
a
,
v
[
i
]);
}
}
TEST_F
(
TqMetaUpdateAppendTest
,
dupCommit
)
{
taosSeedRand
(
0
);
std
::
vector
<
int
>
v
;
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
v
.
push_back
(
taosRand
());
Foo
foo
;
foo
.
a
=
v
[
i
];
tqHandleCopyPut
(
pMeta
,
i
,
&
foo
,
sizeof
(
Foo
));
}
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
int
ret
=
tqHandleCommit
(
pMeta
,
i
);
EXPECT_EQ
(
ret
,
0
);
ret
=
tqHandleCommit
(
pMeta
,
i
);
EXPECT_EQ
(
ret
,
-
1
);
}
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
int
ret
=
tqHandleCommit
(
pMeta
,
i
);
EXPECT_EQ
(
ret
,
-
1
);
}
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
Foo
*
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
i
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
)
<<
" at idx "
<<
i
<<
"
\n
"
;
EXPECT_EQ
(
pFoo
->
a
,
v
[
i
]);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录