Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ca2d1f8b
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ca2d1f8b
编写于
6月 08, 2022
作者:
G
Ganlin Zhao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into enh/histogram_split
上级
ea05f8af
a4ac8b1c
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
457 addition
and
1433 deletion
+457
-1433
include/client/taos.h
include/client/taos.h
+21
-21
include/util/talgo.h
include/util/talgo.h
+3
-4
source/client/src/tmq.c
source/client/src/tmq.c
+13
-9
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+8
-0
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+0
-2
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+0
-51
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-13
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+3
-0
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+321
-83
source/dnode/vnode/src/tsdb/tsdbCommit2.c
source/dnode/vnode/src/tsdb/tsdbCommit2.c
+0
-436
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+0
-177
source/dnode/vnode/src/tsdb/tsdbMemTable2.c
source/dnode/vnode/src/tsdb/tsdbMemTable2.c
+0
-530
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
+0
-27
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+15
-11
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+5
-0
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+1
-0
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+4
-1
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+20
-0
source/util/src/talgo.c
source/util/src/talgo.c
+31
-65
source/util/test/CMakeLists.txt
source/util/test/CMakeLists.txt
+8
-0
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+3
-3
未找到文件。
include/client/taos.h
浏览文件 @
ca2d1f8b
...
...
@@ -103,10 +103,10 @@ typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
typedef
struct
TAOS_MULTI_BIND
{
int
buffer_type
;
void
*
buffer
;
void
*
buffer
;
uintptr_t
buffer_length
;
int32_t
*
length
;
char
*
is_null
;
int32_t
*
length
;
char
*
is_null
;
int
num
;
}
TAOS_MULTI_BIND
;
...
...
@@ -130,7 +130,7 @@ DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT
int
taos_options
(
TSDB_OPTION
option
,
const
void
*
arg
,
...);
DLL_EXPORT
setConfRet
taos_set_config
(
const
char
*
config
);
DLL_EXPORT
int
taos_init
(
void
);
DLL_EXPORT
TAOS
*
taos_connect
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
TAOS
*
taos_connect
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
TAOS
*
taos_connect_l
(
const
char
*
ip
,
int
ipLen
,
const
char
*
user
,
int
userLen
,
const
char
*
pass
,
int
passLen
,
const
char
*
db
,
int
dbLen
,
uint16_t
port
);
DLL_EXPORT
TAOS
*
taos_connect_auth
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
...
...
@@ -147,17 +147,17 @@ DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name
DLL_EXPORT
int
taos_stmt_get_tag_fields
(
TAOS_STMT
*
stmt
,
int
*
fieldNum
,
TAOS_FIELD_E
**
fields
);
DLL_EXPORT
int
taos_stmt_get_col_fields
(
TAOS_STMT
*
stmt
,
int
*
fieldNum
,
TAOS_FIELD_E
**
fields
);
DLL_EXPORT
int
taos_stmt_is_insert
(
TAOS_STMT
*
stmt
,
int
*
insert
);
DLL_EXPORT
int
taos_stmt_num_params
(
TAOS_STMT
*
stmt
,
int
*
nums
);
DLL_EXPORT
int
taos_stmt_get_param
(
TAOS_STMT
*
stmt
,
int
idx
,
int
*
type
,
int
*
bytes
);
DLL_EXPORT
int
taos_stmt_bind_param
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
);
DLL_EXPORT
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
);
DLL_EXPORT
int
taos_stmt_bind_single_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
,
int
colIdx
);
DLL_EXPORT
int
taos_stmt_add_batch
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_execute
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_is_insert
(
TAOS_STMT
*
stmt
,
int
*
insert
);
DLL_EXPORT
int
taos_stmt_num_params
(
TAOS_STMT
*
stmt
,
int
*
nums
);
DLL_EXPORT
int
taos_stmt_get_param
(
TAOS_STMT
*
stmt
,
int
idx
,
int
*
type
,
int
*
bytes
);
DLL_EXPORT
int
taos_stmt_bind_param
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
);
DLL_EXPORT
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
);
DLL_EXPORT
int
taos_stmt_bind_single_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
,
int
colIdx
);
DLL_EXPORT
int
taos_stmt_add_batch
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_execute
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
TAOS_RES
*
taos_stmt_use_result
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_close
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
char
*
taos_stmt_errstr
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
char
*
taos_stmt_errstr
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_affected_rows
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_affected_rows_once
(
TAOS_STMT
*
stmt
);
...
...
@@ -179,11 +179,11 @@ DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
DLL_EXPORT
int
taos_fetch_block
(
TAOS_RES
*
res
,
TAOS_ROW
*
rows
);
DLL_EXPORT
int
taos_fetch_block_s
(
TAOS_RES
*
res
,
int
*
numOfRows
,
TAOS_ROW
*
rows
);
DLL_EXPORT
int
taos_fetch_raw_block
(
TAOS_RES
*
res
,
int
*
numOfRows
,
void
**
pData
);
DLL_EXPORT
int
*
taos_get_column_data_offset
(
TAOS_RES
*
res
,
int
columnIndex
);
DLL_EXPORT
int
*
taos_get_column_data_offset
(
TAOS_RES
*
res
,
int
columnIndex
);
DLL_EXPORT
int
taos_validate_sql
(
TAOS
*
taos
,
const
char
*
sql
);
DLL_EXPORT
void
taos_reset_current_db
(
TAOS
*
taos
);
DLL_EXPORT
int
*
taos_fetch_lengths
(
TAOS_RES
*
res
);
DLL_EXPORT
int
*
taos_fetch_lengths
(
TAOS_RES
*
res
);
DLL_EXPORT
TAOS_ROW
*
taos_result_block
(
TAOS_RES
*
res
);
DLL_EXPORT
const
char
*
taos_get_server_info
(
TAOS
*
taos
);
...
...
@@ -204,17 +204,17 @@ DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
DLL_EXPORT
void
taos_unsubscribe
(
TAOS_SUB
*
tsub
,
int
keepProgress
);
#endif
DLL_EXPORT
int
taos_load_table_info
(
TAOS
*
taos
,
const
char
*
tableNameList
);
DLL_EXPORT
int
taos_load_table_info
(
TAOS
*
taos
,
const
char
*
tableNameList
);
DLL_EXPORT
TAOS_RES
*
taos_schemaless_insert
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
int
protocol
,
int
precision
);
/* --------------------------TMQ INTERFACE------------------------------- */
enum
tmq_resp_err_t
{
enum
{
TMQ_RESP_ERR__FAIL
=
-
1
,
TMQ_RESP_ERR__SUCCESS
=
0
,
};
typedef
enum
tmq_resp_err
_t
tmq_resp_err_t
;
typedef
int32
_t
tmq_resp_err_t
;
typedef
struct
tmq_t
tmq_t
;
typedef
struct
tmq_topic_vgroup_t
tmq_topic_vgroup_t
;
...
...
@@ -229,7 +229,7 @@ DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT
int32_t
tmq_list_append
(
tmq_list_t
*
,
const
char
*
);
DLL_EXPORT
void
tmq_list_destroy
(
tmq_list_t
*
);
DLL_EXPORT
int32_t
tmq_list_get_size
(
const
tmq_list_t
*
);
DLL_EXPORT
char
**
tmq_list_to_c_array
(
const
tmq_list_t
*
);
DLL_EXPORT
char
**
tmq_list_to_c_array
(
const
tmq_list_t
*
);
DLL_EXPORT
tmq_t
*
tmq_consumer_new
(
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
...
...
@@ -240,7 +240,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT
tmq_resp_err_t
tmq_subscribe
(
tmq_t
*
tmq
,
const
tmq_list_t
*
topic_list
);
DLL_EXPORT
tmq_resp_err_t
tmq_unsubscribe
(
tmq_t
*
tmq
);
DLL_EXPORT
tmq_resp_err_t
tmq_subscription
(
tmq_t
*
tmq
,
tmq_list_t
**
topics
);
DLL_EXPORT
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
timeout
);
DLL_EXPORT
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
timeout
);
DLL_EXPORT
tmq_resp_err_t
tmq_consumer_close
(
tmq_t
*
tmq
);
DLL_EXPORT
tmq_resp_err_t
tmq_commit_sync
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
);
DLL_EXPORT
void
tmq_commit_async
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
,
tmq_commit_cb
*
cb
,
void
*
param
);
...
...
@@ -260,7 +260,7 @@ enum tmq_conf_res_t {
typedef
enum
tmq_conf_res_t
tmq_conf_res_t
;
DLL_EXPORT
tmq_conf_t
*
tmq_conf_new
();
DLL_EXPORT
tmq_conf_t
*
tmq_conf_new
();
DLL_EXPORT
tmq_conf_res_t
tmq_conf_set
(
tmq_conf_t
*
conf
,
const
char
*
key
,
const
char
*
value
);
DLL_EXPORT
void
tmq_conf_destroy
(
tmq_conf_t
*
conf
);
DLL_EXPORT
void
tmq_conf_set_auto_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
,
void
*
param
);
...
...
include/util/talgo.h
浏览文件 @
ca2d1f8b
...
...
@@ -65,7 +65,7 @@ void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __
* @param flags
* @return
*/
void
*
taosbsearch
(
const
void
*
key
,
const
void
*
base
,
int
64_t
nmemb
,
int64_t
size
,
__compar_fn_t
fn
,
int32_t
flags
);
void
*
taosbsearch
(
const
void
*
key
,
const
void
*
base
,
int
32_t
nmemb
,
int32_t
size
,
__compar_fn_t
compar
,
int32_t
flags
);
/**
* adjust heap
...
...
@@ -82,7 +82,7 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size
* @return
*/
void
taosheapadjust
(
void
*
base
,
int32_t
size
,
int32_t
start
,
int32_t
end
,
const
void
*
parcompar
,
__ext_compar_fn_t
compar
,
char
*
buf
,
bool
maxroot
);
__ext_compar_fn_t
compar
,
char
*
buf
,
bool
maxroot
);
/**
* sort heap to make sure it is a max/min root heap
...
...
@@ -97,8 +97,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const
* @param maxroot: if heap is max root heap
* @return
*/
void
taosheapsort
(
void
*
base
,
int32_t
size
,
int32_t
len
,
const
void
*
parcompar
,
__ext_compar_fn_t
compar
,
bool
maxroot
);
void
taosheapsort
(
void
*
base
,
int32_t
size
,
int32_t
len
,
const
void
*
parcompar
,
__ext_compar_fn_t
compar
,
bool
maxroot
);
#ifdef __cplusplus
}
...
...
source/client/src/tmq.c
浏览文件 @
ca2d1f8b
...
...
@@ -323,7 +323,7 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
int32_t
tmqCommitCb
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SMqCommitCbParam
*
pParam
=
(
SMqCommitCbParam
*
)
param
;
pParam
->
rspErr
=
code
==
0
?
TMQ_RESP_ERR__SUCCESS
:
TMQ_RESP_ERR__FAIL
;
pParam
->
rspErr
=
code
;
if
(
pParam
->
async
)
{
if
(
pParam
->
automatic
&&
pParam
->
tmq
->
commitCb
)
{
pParam
->
tmq
->
commitCb
(
pParam
->
tmq
,
pParam
->
rspErr
,
(
tmq_topic_vgroup_list_t
*
)
pParam
->
offsets
,
...
...
@@ -432,12 +432,13 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_
code
=
pParam
->
rspErr
;
tsem_destroy
(
&
pParam
->
rspSem
);
taosMemoryFree
(
pParam
);
}
else
{
code
=
0
;
}
// avoid double free if msg is sent
buf
=
NULL
;
code
=
0
;
END:
if
(
buf
)
taosMemoryFree
(
buf
);
/*if (pParam) taosMemoryFree(pParam);*/
...
...
@@ -445,9 +446,9 @@ END:
if
(
code
!=
0
&&
async
)
{
if
(
automatic
)
{
tmq
->
commitCb
(
tmq
,
TMQ_RESP_ERR__FAIL
,
(
tmq_topic_vgroup_list_t
*
)
pOffsets
,
tmq
->
commitCbUserParam
);
tmq
->
commitCb
(
tmq
,
code
,
(
tmq_topic_vgroup_list_t
*
)
pOffsets
,
tmq
->
commitCbUserParam
);
}
else
{
userCb
(
tmq
,
TMQ_RESP_ERR__FAIL
,
(
tmq_topic_vgroup_list_t
*
)
pOffsets
,
userParam
);
userCb
(
tmq
,
code
,
(
tmq_topic_vgroup_list_t
*
)
pOffsets
,
userParam
);
}
}
...
...
@@ -1474,16 +1475,16 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
tmq_resp_err_t
tmq_consumer_close
(
tmq_t
*
tmq
)
{
if
(
tmq
->
status
==
TMQ_CONSUMER_STATUS__READY
)
{
tmq_resp_err_t
rsp
=
tmq_commit_sync
(
tmq
,
NULL
);
if
(
rsp
==
TMQ_RESP_ERR__FAIL
)
{
return
TMQ_RESP_ERR__FAIL
;
if
(
rsp
!=
TMQ_RESP_ERR__SUCCESS
)
{
return
rsp
;
}
tmq_list_t
*
lst
=
tmq_list_new
();
rsp
=
tmq_subscribe
(
tmq
,
lst
);
tmq_list_destroy
(
lst
);
if
(
rsp
==
TMQ_RESP_ERR__FAIL
)
{
return
TMQ_RESP_ERR__FAIL
;
if
(
rsp
!=
TMQ_RESP_ERR__SUCCESS
)
{
return
rsp
;
}
}
// TODO: free resources
...
...
@@ -1493,8 +1494,11 @@ tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
const
char
*
tmq_err2str
(
tmq_resp_err_t
err
)
{
if
(
err
==
TMQ_RESP_ERR__SUCCESS
)
{
return
"success"
;
}
else
if
(
err
==
TMQ_RESP_ERR__FAIL
)
{
return
"fail"
;
}
else
{
return
tstrerror
(
err
);
}
return
"fail"
;
}
const
char
*
tmq_get_topic_name
(
TAOS_RES
*
res
)
{
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
ca2d1f8b
...
...
@@ -92,6 +92,14 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMnode
,
pLostMsg
->
consumerId
);
ASSERT
(
pConsumer
);
mInfo
(
"receive consumer lost msg, consumer id %ld, status %s"
,
pLostMsg
->
consumerId
,
mndConsumerStatusName
(
pConsumer
->
status
));
if
(
pConsumer
->
status
!=
MQ_CONSUMER_STATUS__READY
)
{
mndReleaseConsumer
(
pMnode
,
pConsumer
);
return
-
1
;
}
SMqConsumerObj
*
pConsumerNew
=
tNewSMqConsumerObj
(
pConsumer
->
consumerId
,
pConsumer
->
cgroup
);
pConsumerNew
->
updateType
=
CONSUMER_UPDATE__LOST
;
...
...
source/dnode/vnode/CMakeLists.txt
浏览文件 @
ca2d1f8b
...
...
@@ -36,12 +36,10 @@ target_sources(
# tsdb
"src/tsdb/tsdbCommit.c"
# "src/tsdb/tsdbCommit2.c"
"src/tsdb/tsdbFile.c"
"src/tsdb/tsdbFS.c"
"src/tsdb/tsdbOpen.c"
"src/tsdb/tsdbMemTable.c"
# "src/tsdb/tsdbMemTable2.c"
"src/tsdb/tsdbRead.c"
"src/tsdb/tsdbReadImpl.c"
"src/tsdb/tsdbWrite.c"
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
ca2d1f8b
...
...
@@ -58,28 +58,6 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, ST
bool
tsdbTbDataIterNext
(
STbDataIter
*
pIter
);
bool
tsdbTbDataIterGet
(
STbDataIter
*
pIter
,
TSDBROW
*
pRow
);
int
tsdbLoadDataFromCache
(
STsdb
*
pTsdb
,
STable
*
pTable
,
STbDataIter
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
);
// tsdbMemTable2.c ==============================================================================================
// typedef struct SMemTable2 SMemTable2;
// typedef struct SMemData SMemData;
// typedef struct SMemDataIter SMemDataIter;
// int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2 **ppMemTable);
// void tsdbMemTableDestroy2(SMemTable2 *pMemTable);
// int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk);
// int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
// /* SMemDataIter */
// void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter);
// bool tsdbMemDataIterNext(SMemDataIter *pIter);
// void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow);
// // tsdbCommit2.c ==============================================================================================
// int32_t tsdbBegin2(STsdb *pTsdb);
// int32_t tsdbCommit2(STsdb *pTsdb);
// tsdbFile.c ==============================================================================================
typedef
int32_t
TSDB_FILE_T
;
typedef
struct
SDFInfo
SDFInfo
;
...
...
@@ -700,17 +678,6 @@ typedef struct {
TSKEY
eKey
;
}
SDelInfo
;
struct
SMemTable2
{
STsdb
*
pTsdb
;
int32_t
nRef
;
TSDBKEY
minKey
;
TSDBKEY
maxKey
;
int64_t
nRows
;
int64_t
nDelOp
;
SArray
*
aSkmInfo
;
SArray
*
aMemData
;
};
static
FORCE_INLINE
int
tsdbKeyCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
TSDBKEY
*
pKey1
=
(
TSDBKEY
*
)
p1
;
TSDBKEY
*
pKey2
=
(
TSDBKEY
*
)
p2
;
...
...
@@ -730,24 +697,6 @@ static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) {
return
0
;
}
struct
SMemData
{
tb_uid_t
suid
;
tb_uid_t
uid
;
TSDBKEY
minKey
;
TSDBKEY
maxKey
;
SDelOp
*
delOpHead
;
SDelOp
*
delOpTail
;
SMemSkipList
sl
;
};
struct
SMemDataIter
{
STbData
*
pMemData
;
int8_t
backward
;
TSDBROW
*
pRow
;
SMemSkipListNode
*
pNode
;
// current node
TSDBROW
row
;
};
struct
STbDataIter
{
STbData
*
pTbData
;
int8_t
backward
;
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
ca2d1f8b
...
...
@@ -112,7 +112,7 @@ int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
// tsdb
int
tsdbOpen
(
SVnode
*
pVnode
,
STsdb
**
ppTsdb
,
const
char
*
dir
,
STsdbKeepCfg
*
pKeepCfg
);
int
tsdbClose
(
STsdb
**
pTsdb
);
int
tsdbBegin
(
STsdb
*
pTsdb
);
int
32_t
tsdbBegin
(
STsdb
*
pTsdb
);
int32_t
tsdbCommit
(
STsdb
*
pTsdb
);
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
...
...
@@ -161,18 +161,6 @@ int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore);
void
tdUidStoreDestory
(
STbUidStore
*
pStore
);
void
*
tdUidStoreFree
(
STbUidStore
*
pStore
);
#if 0
int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version);
int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg);
int32_t tsdbRegisterRSma(STsdb* pTsdb, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb);
int32_t tsdbFetchTbUidList(STsdb* pTsdb, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore);
void tsdbUidStoreDestory(STbUidStore* pStore);
void* tsdbUidStoreFree(STbUidStore* pStore);
int32_t tsdbTriggerRSma(STsdb* pTsdb, void* pMsg, int32_t inputType);
#endif
typedef
struct
{
int8_t
streamType
;
// sma or other
int8_t
dstType
;
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
ca2d1f8b
...
...
@@ -39,6 +39,9 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataBlkRsp* pRs
static
int32_t
tqAddBlockSchemaToRsp
(
const
STqExecHandle
*
pExec
,
int32_t
workerId
,
SMqDataBlkRsp
*
pRsp
)
{
SSchemaWrapper
*
pSW
=
tCloneSSchemaWrapper
(
pExec
->
pExecReader
[
workerId
]
->
pSchemaWrapper
);
if
(
pSW
==
NULL
)
{
return
-
1
;
}
taosArrayPush
(
pRsp
->
blockSchema
,
&
pSW
);
return
0
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
ca2d1f8b
...
...
@@ -28,6 +28,8 @@ typedef struct {
int
niters
;
// memory iterators
SCommitIter
*
iters
;
bool
isRFileSet
;
// read and commit FSET
int32_t
fid
;
SDFileSet
*
pSet
;
SReadH
readh
;
SDFileSet
wSet
;
bool
isDFileSame
;
...
...
@@ -58,8 +60,12 @@ typedef struct {
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
static
void
tsdbStartCommit
(
STsdb
*
pRepo
);
static
void
tsdbEndCommit
(
STsdb
*
pTsdb
,
int
eno
);
static
int32_t
tsdbCommitData
(
SCommitH
*
pCommith
);
static
int32_t
tsdbCommitDel
(
SCommitH
*
pCommith
);
static
int32_t
tsdbCommitCache
(
SCommitH
*
pCommith
);
static
int32_t
tsdbStartCommit
(
STsdb
*
pTsdb
,
SCommitH
*
pCHandle
);
static
int32_t
tsdbEndCommit
(
SCommitH
*
pCHandle
,
int
eno
);
static
int
tsdbInitCommitH
(
SCommitH
*
pCommith
,
STsdb
*
pRepo
);
static
void
tsdbSeekCommitIter
(
SCommitH
*
pCommith
,
TSKEY
key
);
static
int
tsdbNextCommitFid
(
SCommitH
*
pCommith
);
...
...
@@ -67,7 +73,6 @@ static void tsdbDestroyCommitH(SCommitH *pCommith);
static
int32_t
tsdbCreateCommitIters
(
SCommitH
*
pCommith
);
static
void
tsdbDestroyCommitIters
(
SCommitH
*
pCommith
);
static
int
tsdbCommitToFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
);
static
void
tsdbResetCommitFile
(
SCommitH
*
pCommith
);
static
int
tsdbSetAndOpenCommitFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
);
static
int
tsdbCommitToTable
(
SCommitH
*
pCommith
,
int
tid
);
static
bool
tsdbCommitIsSameFile
(
SCommitH
*
pCommith
,
int
bidx
);
...
...
@@ -88,8 +93,11 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i
SDataCols
*
pTarget
,
TSKEY
maxKey
,
int
maxRows
,
int8_t
update
);
static
int
tsdbWriteBlockIdx
(
SDFile
*
pHeadf
,
SArray
*
pIdxA
,
void
**
ppBuf
);
static
int
tsdbApplyRtnOnFSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
,
SRtn
*
pRtn
);
static
int
tsdbLoadDataFromCache
(
STsdb
*
pTsdb
,
STable
*
pTable
,
STbDataIter
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
);
int
tsdbBegin
(
STsdb
*
pTsdb
)
{
int
32_t
tsdbBegin
(
STsdb
*
pTsdb
)
{
if
(
!
pTsdb
)
return
0
;
SMemTable
*
pMem
;
...
...
@@ -112,15 +120,50 @@ int32_t tsdbCommit(STsdb *pTsdb) {
pTsdb
->
mem
=
NULL
;
// start commit
tsdbStartCommit
(
pTsdb
);
if
(
tsdbInitCommitH
(
&
commith
,
pTsdb
)
<
0
)
{
return
-
1
;
code
=
tsdbStartCommit
(
pTsdb
,
&
commith
);
if
(
code
)
{
goto
_err
;
}
// commit impl
code
=
tsdbCommitData
(
&
commith
);
if
(
code
)
{
goto
_err
;
}
code
=
tsdbCommitDel
(
&
commith
);
if
(
code
)
{
goto
_err
;
}
code
=
tsdbCommitCache
(
&
commith
);
if
(
code
)
{
goto
_err
;
}
// end commit
code
=
tsdbEndCommit
(
&
commith
,
0
);
if
(
code
)
{
goto
_err
;
}
return
code
;
_err:
tsdbError
(
"vgId:%d failed to commit since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbCommitData
(
SCommitH
*
pCommith
)
{
int32_t
fid
;
SDFileSet
*
pSet
=
NULL
;
int32_t
code
=
0
;
STsdb
*
pTsdb
=
TSDB_COMMIT_REPO
(
pCommith
);
// Skip expired memory data and expired FSET
tsdbSeekCommitIter
(
&
commith
,
commith
.
rtn
.
minKey
);
while
((
pSet
=
tsdbFSIterNext
(
&
(
commith
.
fsIter
))))
{
if
(
pSet
->
fid
<
commith
.
rtn
.
minFid
)
{
tsdbSeekCommitIter
(
pCommith
,
pCommith
->
rtn
.
minKey
);
while
((
pSet
=
tsdbFSIterNext
(
&
(
pCommith
->
fsIter
))))
{
if
(
pSet
->
fid
<
pCommith
->
rtn
.
minFid
)
{
tsdbInfo
(
"vgId:%d, FSET %d on level %d disk id %d expires, remove it"
,
REPO_ID
(
pTsdb
),
pSet
->
fid
,
TSDB_FSET_LEVEL
(
pSet
),
TSDB_FSET_ID
(
pSet
));
}
else
{
...
...
@@ -129,7 +172,7 @@ int32_t tsdbCommit(STsdb *pTsdb) {
}
// commit
fid
=
tsdbNextCommitFid
(
&
(
commith
)
);
fid
=
tsdbNextCommitFid
(
pCommith
);
while
(
true
)
{
// Loop over both on disk and memory
if
(
pSet
==
NULL
&&
fid
==
TSDB_IVLD_FID
)
break
;
...
...
@@ -137,12 +180,12 @@ int32_t tsdbCommit(STsdb *pTsdb) {
if
(
pSet
&&
(
fid
==
TSDB_IVLD_FID
||
pSet
->
fid
<
fid
))
{
// Only has existing FSET but no memory data to commit in this
// existing FSET, only check if file in correct retention
if
(
tsdbApplyRtnOnFSet
(
pTsdb
,
pSet
,
&
(
commith
.
rtn
))
<
0
)
{
tsdbDestroyCommitH
(
&
c
ommith
);
if
(
tsdbApplyRtnOnFSet
(
TSDB_COMMIT_REPO
(
pCommith
),
pSet
,
&
(
pCommith
->
rtn
))
<
0
)
{
tsdbDestroyCommitH
(
pC
ommith
);
return
-
1
;
}
pSet
=
tsdbFSIterNext
(
&
(
commith
.
fsIter
));
pSet
=
tsdbFSIterNext
(
&
(
pCommith
->
fsIter
));
}
else
{
// Has memory data to commit
SDFileSet
*
pCSet
;
...
...
@@ -156,22 +199,30 @@ int32_t tsdbCommit(STsdb *pTsdb) {
// Commit to an existing FSET
pCSet
=
pSet
;
cfid
=
pSet
->
fid
;
pSet
=
tsdbFSIterNext
(
&
(
commith
.
fsIter
));
pSet
=
tsdbFSIterNext
(
&
(
pCommith
->
fsIter
));
}
if
(
tsdbCommitToFile
(
&
c
ommith
,
pCSet
,
cfid
)
<
0
)
{
tsdbDestroyCommitH
(
&
c
ommith
);
if
(
tsdbCommitToFile
(
pC
ommith
,
pCSet
,
cfid
)
<
0
)
{
tsdbDestroyCommitH
(
pC
ommith
);
return
-
1
;
}
fid
=
tsdbNextCommitFid
(
&
c
ommith
);
fid
=
tsdbNextCommitFid
(
pC
ommith
);
}
}
// end commit
tsdbDestroyCommitH
(
&
commith
);
tsdbEndCommit
(
pTsdb
,
TSDB_CODE_SUCCESS
);
return
code
;
}
static
int32_t
tsdbCommitDel
(
SCommitH
*
pCommith
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbCommitCache
(
SCommitH
*
pCommith
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
...
...
@@ -216,16 +267,6 @@ static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
return
0
;
}
// int tsdbPrepareCommit(STsdb *pTsdb) {
// if (pTsdb->mem == NULL) return 0;
// ASSERT(pTsdb->imem == NULL);
// pTsdb->imem = pTsdb->mem;
// pTsdb->mem = NULL;
// return 0;
// }
void
tsdbGetRtnSnap
(
STsdb
*
pRepo
,
SRtn
*
pRtn
)
{
STsdbKeepCfg
*
pCfg
=
REPO_KEEP_CFG
(
pRepo
);
TSKEY
minKey
,
midKey
,
maxKey
,
now
;
...
...
@@ -243,19 +284,32 @@ void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
pRtn
->
minFid
,
pRtn
->
midFid
,
pRtn
->
maxFid
);
}
static
void
tsdbStartCommit
(
STsdb
*
pRepo
)
{
SMemTable
*
pMem
=
pRepo
->
imem
;
static
int32_t
tsdbStartCommit
(
STsdb
*
pTsdb
,
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
tsdbInfo
(
"vgId:%d, start to commit"
,
REPO_ID
(
pTsdb
));
if
(
tsdbInitCommitH
(
pCHandle
,
pTsdb
)
<
0
)
{
return
-
1
;
}
tsdb
Info
(
"vgId:%d, start to commit"
,
REPO_ID
(
pRepo
)
);
tsdb
StartFSTxn
(
pTsdb
,
0
,
0
);
tsdbStartFSTxn
(
pRepo
,
0
,
0
)
;
return
code
;
}
static
void
tsdbEndCommit
(
STsdb
*
pTsdb
,
int
eno
)
{
static
int32_t
tsdbEndCommit
(
SCommitH
*
pCHandle
,
int
eno
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
TSDB_COMMIT_REPO
(
pCHandle
);
tsdbDestroyCommitH
(
pCHandle
);
tsdbEndFSTxn
(
pTsdb
);
tsdbMemTableDestroy
(
pTsdb
->
imem
);
pTsdb
->
imem
=
NULL
;
tsdbInfo
(
"vgId:%d, commit over, %s"
,
REPO_ID
(
pTsdb
),
(
eno
==
TSDB_CODE_SUCCESS
)
?
"succeed"
:
"failed"
);
return
code
;
}
static
int
tsdbInitCommitH
(
SCommitH
*
pCommith
,
STsdb
*
pRepo
)
{
...
...
@@ -354,34 +408,73 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
tsdbCloseDFileSet
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
));
}
static
int
tsdbCommitToFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
)
{
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
static
int32_t
tsdbCommitToFileStart
(
SCommitH
*
pCHandle
,
SDFileSet
*
pSet
,
int32_t
fid
)
{
int32_t
code
=
0
;
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCHandle
);
STsdbKeepCfg
*
pCfg
=
REPO_KEEP_CFG
(
pRepo
);
ASSERT
(
pSet
==
NULL
||
pSet
->
fid
==
fid
);
tsdbResetCommitFile
(
pCommith
);
tsdbGetFidKeyRange
(
pCfg
->
days
,
pCfg
->
precision
,
fid
,
&
(
pCommith
->
minKey
),
&
(
pCommith
->
maxKey
));
pCHandle
->
fid
=
fid
;
pCHandle
->
pSet
=
pSet
;
pCHandle
->
isRFileSet
=
false
;
pCHandle
->
isDFileSame
=
false
;
pCHandle
->
isLFileSame
=
false
;
taosArrayClear
(
pCHandle
->
aBlkIdx
);
tsdbGetFidKeyRange
(
pCfg
->
days
,
pCfg
->
precision
,
fid
,
&
(
pCHandle
->
minKey
),
&
(
pCHandle
->
maxKey
));
code
=
tsdbSetAndOpenCommitFile
(
pCHandle
,
pSet
,
fid
);
// Set and open files
if
(
tsdbSetAndOpenCommitFile
(
pCommith
,
pSet
,
fid
)
<
0
)
{
return
code
;
}
static
int32_t
tsdbCommitToFileImpl
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbCommitToFileEnd
(
SCommitH
*
pCommith
)
{
int32_t
code
=
0
;
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
if
(
tsdbWriteBlockIdx
(
TSDB_COMMIT_HEAD_FILE
(
pCommith
),
pCommith
->
aBlkIdx
,
(
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommith
))))
<
0
)
{
tsdbError
(
"vgId:%d, failed to write SBlockIdx part to FSET %d since %s"
,
REPO_ID
(
pRepo
),
pCommith
->
fid
,
tstrerror
(
terrno
));
tsdbCloseCommitFile
(
pCommith
,
true
);
// revert the file change
tsdbApplyDFileSetChange
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
),
pCommith
->
pSet
);
return
-
1
;
}
#if 0
// Loop to commit each table data
for (int tid = 0; tid < pCommith->niters; tid++) {
SCommitIter *pIter = pCommith->iters + tid;
if (pIter->pTable == NULL) continue;
if
(
tsdbUpdateDFileSetHeader
(
&
(
pCommith
->
wSet
))
<
0
)
{
tsdbError
(
"vgId:%d, failed to update FSET %d header since %s"
,
REPO_ID
(
pRepo
),
pCommith
->
fid
,
tstrerror
(
terrno
));
tsdbCloseCommitFile
(
pCommith
,
true
);
// revert the file change
tsdbApplyDFileSetChange
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
),
pCommith
->
pSet
);
return
-
1
;
}
if (tsdbCommitToTable(pCommith, tid) < 0) {
tsdbCloseCommitFile(pCommith, true);
// revert the file change
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
return -1;
}
// Close commit file
tsdbCloseCommitFile
(
pCommith
,
false
);
if
(
tsdbUpdateDFileSet
(
REPO_FS
(
pRepo
),
&
(
pCommith
->
wSet
))
<
0
)
{
return
-
1
;
}
#endif
return
code
;
}
static
int32_t
tsdbCommitToFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
)
{
int32_t
code
=
0
;
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdbKeepCfg
*
pCfg
=
REPO_KEEP_CFG
(
pRepo
);
// commit to file start
code
=
tsdbCommitToFileStart
(
pCommith
,
pSet
,
fid
);
if
(
code
)
{
goto
_err
;
}
// Loop to commit each table data in mem and file
int
mIter
=
0
,
fIter
=
0
;
int
nBlkIdx
=
taosArrayGetSize
(
pCommith
->
readh
.
aBlkIdx
);
...
...
@@ -426,31 +519,16 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
}
}
if
(
tsdbWriteBlockIdx
(
TSDB_COMMIT_HEAD_FILE
(
pCommith
),
pCommith
->
aBlkIdx
,
(
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommith
))))
<
0
)
{
tsdbError
(
"vgId:%d, failed to write SBlockIdx part to FSET %d since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
tsdbCloseCommitFile
(
pCommith
,
true
);
// revert the file change
tsdbApplyDFileSetChange
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
),
pSet
);
return
-
1
;
}
if
(
tsdbUpdateDFileSetHeader
(
&
(
pCommith
->
wSet
))
<
0
)
{
tsdbError
(
"vgId:%d, failed to update FSET %d header since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
tsdbCloseCommitFile
(
pCommith
,
true
);
// revert the file change
tsdbApplyDFileSetChange
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
),
pSet
);
return
-
1
;
// commit to file end
code
=
tsdbCommitToFileEnd
(
pCommith
);
if
(
code
)
{
goto
_err
;
}
// Close commit file
tsdbCloseCommitFile
(
pCommith
,
false
);
if
(
tsdbUpdateDFileSet
(
REPO_FS
(
pRepo
),
&
(
pCommith
->
wSet
))
<
0
)
{
return
-
1
;
}
return
code
;
return
0
;
_err:
return
code
;
}
static
int32_t
tsdbCreateCommitIters
(
SCommitH
*
pCommith
)
{
...
...
@@ -507,13 +585,6 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) {
pCommith
->
niters
=
0
;
}
static
void
tsdbResetCommitFile
(
SCommitH
*
pCommith
)
{
pCommith
->
isRFileSet
=
false
;
pCommith
->
isDFileSame
=
false
;
pCommith
->
isLFileSame
=
false
;
taosArrayClear
(
pCommith
->
aBlkIdx
);
}
static
int
tsdbSetAndOpenCommitFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
)
{
SDiskID
did
;
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
...
...
@@ -1590,4 +1661,171 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
}
return
false
;
}
static
int
tsdbAppendTableRowToCols
(
STsdb
*
pTsdb
,
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
STSRow
*
row
,
bool
merge
)
{
if
(
pCols
)
{
if
(
*
ppSchema
==
NULL
||
schemaVersion
(
*
ppSchema
)
!=
TD_ROW_SVER
(
row
))
{
*
ppSchema
=
tsdbGetTableSchemaImpl
(
pTsdb
,
pTable
,
false
,
false
,
TD_ROW_SVER
(
row
));
if
(
*
ppSchema
==
NULL
)
{
ASSERT
(
false
);
return
-
1
;
}
}
tdAppendSTSRowToDataCol
(
row
,
*
ppSchema
,
pCols
,
merge
);
}
return
0
;
}
static
int
tsdbLoadDataFromCache
(
STsdb
*
pTsdb
,
STable
*
pTable
,
STbDataIter
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
)
{
ASSERT
(
maxRowsToRead
>
0
&&
nFilterKeys
>=
0
);
if
(
pIter
==
NULL
)
return
0
;
STSchema
*
pSchema
=
NULL
;
TSKEY
rowKey
=
0
;
TSKEY
fKey
=
0
;
// only fetch lastKey from mem data as file data not used in this function actually
TSKEY
lastKey
=
TSKEY_INITIAL_VAL
;
bool
isRowDel
=
false
;
int
filterIter
=
0
;
STSRow
*
row
=
NULL
;
SMergeInfo
mInfo
;
// TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by
// query handle)
if
(
pMergeInfo
==
NULL
)
pMergeInfo
=
&
mInfo
;
memset
(
pMergeInfo
,
0
,
sizeof
(
*
pMergeInfo
));
pMergeInfo
->
keyFirst
=
INT64_MAX
;
pMergeInfo
->
keyLast
=
INT64_MIN
;
if
(
pCols
)
tdResetDataCols
(
pCols
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
// 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols
// 2. rowKey - would dup since Multi-Version supported
while
(
true
)
{
if
(
fKey
==
INT64_MAX
&&
rowKey
==
INT64_MAX
)
break
;
if
(
fKey
<
rowKey
)
{
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
fKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
fKey
);
filterIter
++
;
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
#if 1
}
else
if
(
fKey
>
rowKey
)
{
if
(
isRowDel
)
{
// TODO: support delete function
pMergeInfo
->
rowsDeleteFailed
++
;
}
else
{
if
(
pMergeInfo
->
rowsInserted
-
pMergeInfo
->
rowsDeleteSucceed
>=
maxRowsToRead
)
break
;
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
if
(
lastKey
!=
rowKey
)
{
pMergeInfo
->
rowsInserted
++
;
pMergeInfo
->
nOperations
++
;
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
rowKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
rowKey
);
if
(
pCols
)
{
if
(
lastKey
!=
TSKEY_INITIAL_VAL
)
{
++
pCols
->
numOfRows
;
}
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
false
);
}
lastKey
=
rowKey
;
}
else
{
if
(
keepDup
)
{
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
true
);
}
else
{
// discard
}
}
}
tsdbTbDataIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
}
else
{
// fkey == rowKey
if
(
isRowDel
)
{
// TODO: support delete function(How to stands for delete in file? rowVersion = -1?)
ASSERT
(
!
keepDup
);
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
pMergeInfo
->
rowsDeleteSucceed
++
;
pMergeInfo
->
nOperations
++
;
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
false
);
}
else
{
if
(
keepDup
)
{
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
if
(
lastKey
!=
rowKey
)
{
pMergeInfo
->
rowsUpdated
++
;
pMergeInfo
->
nOperations
++
;
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
rowKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
rowKey
);
if
(
pCols
)
{
if
(
lastKey
!=
TSKEY_INITIAL_VAL
)
{
++
pCols
->
numOfRows
;
}
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
false
);
}
lastKey
=
rowKey
;
}
else
{
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
true
);
}
}
else
{
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
fKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
fKey
);
}
}
tsdbTbDataIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
filterIter
++
;
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
}
#endif
}
if
(
pCols
&&
(
lastKey
!=
TSKEY_INITIAL_VAL
))
{
++
pCols
->
numOfRows
;
}
return
0
;
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbCommit2.c
已删除
100644 → 0
浏览文件 @
ea05f8af
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
typedef
struct
{
SMemTable2
*
pMemTable
;
int32_t
minutes
;
int8_t
precision
;
TSKEY
nCommitKey
;
int32_t
fid
;
TSKEY
minKey
;
TSKEY
maxKey
;
SReadH
readh
;
SDFileSet
wSet
;
SArray
*
aBlkIdx
;
SArray
*
aSupBlk
;
SArray
*
aSubBlk
;
SArray
*
aDelInfo
;
}
SCommitH
;
static
int32_t
tsdbCommitStart
(
SCommitH
*
pCHandle
,
STsdb
*
pTsdb
);
static
int32_t
tsdbCommitEnd
(
SCommitH
*
pCHandle
);
static
int32_t
tsdbCommitImpl
(
SCommitH
*
pCHandle
);
int32_t
tsdbBegin2
(
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
ASSERT
(
pTsdb
->
mem
==
NULL
);
code
=
tsdbMemTableCreate2
(
pTsdb
,
(
SMemTable2
**
)
&
pTsdb
->
mem
);
if
(
code
)
{
tsdbError
(
"vgId:%d failed to begin TSDB since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
goto
_exit
;
}
_exit:
return
code
;
}
int32_t
tsdbCommit2
(
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
SCommitH
ch
=
{
0
};
// start to commit
code
=
tsdbCommitStart
(
&
ch
,
pTsdb
);
if
(
code
)
{
goto
_exit
;
}
// commit
code
=
tsdbCommitImpl
(
&
ch
);
if
(
code
)
{
goto
_err
;
}
// end commit
code
=
tsdbCommitEnd
(
&
ch
);
if
(
code
)
{
goto
_exit
;
}
_exit:
return
code
;
_err:
tsdbError
(
"vgId:%d failed to commit since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbCommitStart
(
SCommitH
*
pCHandle
,
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
SMemTable2
*
pMemTable
=
(
SMemTable2
*
)
pTsdb
->
mem
;
tsdbInfo
(
"vgId:%d start to commit"
,
TD_VID
(
pTsdb
->
pVnode
));
// switch to commit
ASSERT
(
pTsdb
->
imem
==
NULL
&&
pTsdb
->
mem
);
pTsdb
->
imem
=
pTsdb
->
mem
;
pTsdb
->
mem
=
NULL
;
// open handle
pCHandle
->
pMemTable
=
pMemTable
;
pCHandle
->
minutes
=
pTsdb
->
keepCfg
.
days
;
pCHandle
->
precision
=
pTsdb
->
keepCfg
.
precision
;
pCHandle
->
nCommitKey
=
pMemTable
->
minKey
.
ts
;
code
=
tsdbInitReadH
(
&
pCHandle
->
readh
,
pTsdb
);
if
(
code
)
{
goto
_err
;
}
pCHandle
->
aBlkIdx
=
taosArrayInit
(
0
,
sizeof
(
SBlockIdx
));
if
(
pCHandle
->
aBlkIdx
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pCHandle
->
aSupBlk
=
taosArrayInit
(
0
,
sizeof
(
SBlock
));
if
(
pCHandle
->
aSupBlk
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pCHandle
->
aSubBlk
=
taosArrayInit
(
0
,
sizeof
(
SBlock
));
if
(
pCHandle
->
aSubBlk
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pCHandle
->
aDelInfo
=
taosArrayInit
(
0
,
sizeof
(
SDelInfo
));
if
(
pCHandle
->
aDelInfo
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
// start FS transaction
tsdbStartFSTxn
(
pTsdb
,
0
,
0
);
return
code
;
_err:
return
code
;
}
static
int32_t
tsdbCommitEnd
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCHandle
->
pMemTable
->
pTsdb
;
SMemTable2
*
pMemTable
=
(
SMemTable2
*
)
pTsdb
->
imem
;
// end transaction
code
=
tsdbEndFSTxn
(
pTsdb
);
if
(
code
)
{
goto
_err
;
}
// close handle
taosArrayClear
(
pCHandle
->
aDelInfo
);
taosArrayClear
(
pCHandle
->
aSubBlk
);
taosArrayClear
(
pCHandle
->
aSupBlk
);
taosArrayClear
(
pCHandle
->
aBlkIdx
);
tsdbDestroyReadH
(
&
pCHandle
->
readh
);
// destroy memtable (todo: unref it)
pTsdb
->
imem
=
NULL
;
tsdbMemTableDestroy2
(
pMemTable
);
tsdbInfo
(
"vgId:%d commit over"
,
TD_VID
(
pTsdb
->
pVnode
));
return
code
;
_err:
return
code
;
}
static
int32_t
tsdbCommitTableStart
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbCommitTableEnd
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbCommitTable
(
SCommitH
*
pCHandle
,
SMemData
*
pMemData
,
SBlockIdx
*
pBlockIdx
)
{
int32_t
code
=
0
;
SMemDataIter
iter
=
{
0
};
// commit table start
code
=
tsdbCommitTableStart
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
// commit table impl
if
(
pMemData
&&
pBlockIdx
)
{
// TODO
}
else
if
(
pMemData
)
{
// TODO
}
else
{
// TODO
}
// commit table end
code
=
tsdbCommitTableEnd
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
return
code
;
_err:
return
code
;
}
static
int32_t
tsdbTableIdCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
TABLEID
*
pId1
=
(
TABLEID
*
)
p1
;
TABLEID
*
pId2
=
(
TABLEID
*
)
p2
;
if
(
pId1
->
suid
<
pId2
->
suid
)
{
return
-
1
;
}
else
if
(
pId1
->
suid
>
pId2
->
suid
)
{
return
1
;
}
if
(
pId1
->
uid
<
pId2
->
uid
)
{
return
-
1
;
}
else
if
(
pId1
->
uid
>
pId2
->
uid
)
{
return
1
;
}
return
0
;
}
static
int32_t
tsdbWriteBlockIdx
(
SDFile
*
pFile
,
SArray
*
pArray
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbCommitFileStart
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCHandle
->
pMemTable
->
pTsdb
;
SDFileSet
*
pSet
=
NULL
;
taosArrayClear
(
pCHandle
->
aBlkIdx
);
return
code
;
}
static
int32_t
tsdbCommitFileEnd
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbCommitFile
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
SMemData
*
pMemData
;
SBlockIdx
*
pBlockIdx
;
int32_t
iMemData
;
int32_t
nMemData
;
int32_t
iBlockIdx
;
int32_t
nBlockIdx
;
// commit file start
code
=
tsdbCommitFileStart
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
// commit file impl
iMemData
=
0
;
nMemData
=
taosArrayGetSize
(
pCHandle
->
pMemTable
->
aMemData
);
iBlockIdx
=
0
;
nBlockIdx
=
0
;
// todo
for
(;;)
{
if
(
iMemData
>=
nMemData
&&
iBlockIdx
>=
nBlockIdx
)
break
;
pMemData
=
NULL
;
pBlockIdx
=
NULL
;
if
(
iMemData
<
nMemData
)
{
pMemData
=
(
SMemData
*
)
taosArrayGetP
(
pCHandle
->
pMemTable
->
aMemData
,
iMemData
);
}
if
(
iBlockIdx
<
nBlockIdx
)
{
// pBlockIdx = ;
}
if
(
pMemData
&&
pBlockIdx
)
{
int32_t
c
=
tsdbTableIdCmprFn
(
pMemData
,
pBlockIdx
);
if
(
c
<
0
)
{
iMemData
++
;
pBlockIdx
=
NULL
;
}
else
if
(
c
==
0
)
{
iMemData
++
;
iBlockIdx
++
;
}
else
{
iBlockIdx
++
;
pMemData
=
NULL
;
}
}
else
{
if
(
pMemData
)
{
iMemData
++
;
}
else
{
iBlockIdx
++
;
}
}
code
=
tsdbCommitTable
(
pCHandle
,
pMemData
,
pBlockIdx
);
if
(
code
)
{
goto
_err
;
}
}
// commit file end
code
=
tsdbCommitFileEnd
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
return
code
;
_err:
return
code
;
}
static
int32_t
tsdbCommitData
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
int32_t
fid
;
if
(
pCHandle
->
pMemTable
->
nRows
==
0
)
goto
_exit
;
// loop to commit to each file
for
(;;)
{
if
(
pCHandle
->
nCommitKey
==
TSKEY_MAX
)
break
;
pCHandle
->
fid
=
TSDB_KEY_FID
(
pCHandle
->
nCommitKey
,
pCHandle
->
minutes
,
pCHandle
->
precision
);
tsdbGetFidKeyRange
(
pCHandle
->
minutes
,
pCHandle
->
precision
,
pCHandle
->
fid
,
&
pCHandle
->
minKey
,
&
pCHandle
->
maxKey
);
code
=
tsdbCommitFile
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
}
_exit:
return
code
;
_err:
return
code
;
}
static
int32_t
delInfoCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
SDelInfo
*
pDelInfo1
=
(
SDelInfo
*
)
p1
;
SDelInfo
*
pDelInfo2
=
(
SDelInfo
*
)
p2
;
if
(
pDelInfo1
->
suid
<
pDelInfo2
->
suid
)
{
return
-
1
;
}
else
if
(
pDelInfo1
->
suid
>
pDelInfo2
->
suid
)
{
return
1
;
}
if
(
pDelInfo1
->
uid
<
pDelInfo2
->
uid
)
{
return
-
1
;
}
else
if
(
pDelInfo1
->
uid
>
pDelInfo2
->
uid
)
{
return
1
;
}
if
(
pDelInfo1
->
version
<
pDelInfo2
->
version
)
{
return
-
1
;
}
else
if
(
pDelInfo1
->
version
>
pDelInfo2
->
version
)
{
return
1
;
}
return
0
;
}
static
int32_t
tsdbCommitDelete
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
SDelInfo
delInfo
;
SMemData
*
pMemData
;
if
(
pCHandle
->
pMemTable
->
nDelOp
==
0
)
goto
_exit
;
// load del array (todo)
// loop to append SDelInfo
for
(
int32_t
iMemData
=
0
;
iMemData
<
taosArrayGetSize
(
pCHandle
->
pMemTable
->
aMemData
);
iMemData
++
)
{
pMemData
=
(
SMemData
*
)
taosArrayGetP
(
pCHandle
->
pMemTable
->
aMemData
,
iMemData
);
for
(
SDelOp
*
pDelOp
=
pMemData
->
delOpHead
;
pDelOp
;
pDelOp
=
pDelOp
->
pNext
)
{
delInfo
=
(
SDelInfo
){.
suid
=
pMemData
->
suid
,
.
uid
=
pMemData
->
uid
,
.
version
=
pDelOp
->
version
,
.
sKey
=
pDelOp
->
sKey
,
.
eKey
=
pDelOp
->
eKey
};
if
(
taosArrayPush
(
pCHandle
->
aDelInfo
,
&
delInfo
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
}
}
taosArraySort
(
pCHandle
->
aDelInfo
,
delInfoCmprFn
);
// write to new file
_exit:
return
code
;
_err:
return
code
;
}
static
int32_t
tsdbCommitCache
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbCommitImpl
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
// commit data
code
=
tsdbCommitData
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
// commit delete
code
=
tsdbCommitDelete
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
// commit cache if need (todo)
if
(
0
)
{
code
=
tsdbCommitCache
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
}
return
code
;
_err:
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
ca2d1f8b
...
...
@@ -188,23 +188,6 @@ _err:
return
code
;
}
static
int
tsdbAppendTableRowToCols
(
STsdb
*
pTsdb
,
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
STSRow
*
row
,
bool
merge
)
{
if
(
pCols
)
{
if
(
*
ppSchema
==
NULL
||
schemaVersion
(
*
ppSchema
)
!=
TD_ROW_SVER
(
row
))
{
*
ppSchema
=
tsdbGetTableSchemaImpl
(
pTsdb
,
pTable
,
false
,
false
,
TD_ROW_SVER
(
row
));
if
(
*
ppSchema
==
NULL
)
{
ASSERT
(
false
);
return
-
1
;
}
}
tdAppendSTSRowToDataCol
(
row
,
*
ppSchema
,
pCols
,
merge
);
}
return
0
;
}
int32_t
tsdbTbDataIterCreate
(
STbData
*
pTbData
,
TSDBKEY
*
pFrom
,
int8_t
backward
,
STbDataIter
**
ppIter
)
{
int32_t
code
=
0
;
...
...
@@ -310,166 +293,6 @@ bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow) {
return
true
;
}
/**
* This is an important function to load data or try to load data from memory skiplist iterator.
*
* This function load memory data until:
* 1. iterator ends
* 2. data key exceeds maxKey
* 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
* 4. operations in pCols not exceeds its max capacity if pCols is given
*
* The function tries to procceed AS MUCH AS POSSIBLE.
*/
int
tsdbLoadDataFromCache
(
STsdb
*
pTsdb
,
STable
*
pTable
,
STbDataIter
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
)
{
ASSERT
(
maxRowsToRead
>
0
&&
nFilterKeys
>=
0
);
if
(
pIter
==
NULL
)
return
0
;
STSchema
*
pSchema
=
NULL
;
TSKEY
rowKey
=
0
;
TSKEY
fKey
=
0
;
// only fetch lastKey from mem data as file data not used in this function actually
TSKEY
lastKey
=
TSKEY_INITIAL_VAL
;
bool
isRowDel
=
false
;
int
filterIter
=
0
;
STSRow
*
row
=
NULL
;
SMergeInfo
mInfo
;
// TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by
// query handle)
if
(
pMergeInfo
==
NULL
)
pMergeInfo
=
&
mInfo
;
memset
(
pMergeInfo
,
0
,
sizeof
(
*
pMergeInfo
));
pMergeInfo
->
keyFirst
=
INT64_MAX
;
pMergeInfo
->
keyLast
=
INT64_MIN
;
if
(
pCols
)
tdResetDataCols
(
pCols
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
// 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols
// 2. rowKey - would dup since Multi-Version supported
while
(
true
)
{
if
(
fKey
==
INT64_MAX
&&
rowKey
==
INT64_MAX
)
break
;
if
(
fKey
<
rowKey
)
{
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
fKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
fKey
);
filterIter
++
;
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
#if 1
}
else
if
(
fKey
>
rowKey
)
{
if
(
isRowDel
)
{
// TODO: support delete function
pMergeInfo
->
rowsDeleteFailed
++
;
}
else
{
if
(
pMergeInfo
->
rowsInserted
-
pMergeInfo
->
rowsDeleteSucceed
>=
maxRowsToRead
)
break
;
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
if
(
lastKey
!=
rowKey
)
{
pMergeInfo
->
rowsInserted
++
;
pMergeInfo
->
nOperations
++
;
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
rowKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
rowKey
);
if
(
pCols
)
{
if
(
lastKey
!=
TSKEY_INITIAL_VAL
)
{
++
pCols
->
numOfRows
;
}
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
false
);
}
lastKey
=
rowKey
;
}
else
{
if
(
keepDup
)
{
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
true
);
}
else
{
// discard
}
}
}
tsdbTbDataIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
}
else
{
// fkey == rowKey
if
(
isRowDel
)
{
// TODO: support delete function(How to stands for delete in file? rowVersion = -1?)
ASSERT
(
!
keepDup
);
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
pMergeInfo
->
rowsDeleteSucceed
++
;
pMergeInfo
->
nOperations
++
;
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
false
);
}
else
{
if
(
keepDup
)
{
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
if
(
lastKey
!=
rowKey
)
{
pMergeInfo
->
rowsUpdated
++
;
pMergeInfo
->
nOperations
++
;
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
rowKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
rowKey
);
if
(
pCols
)
{
if
(
lastKey
!=
TSKEY_INITIAL_VAL
)
{
++
pCols
->
numOfRows
;
}
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
false
);
}
lastKey
=
rowKey
;
}
else
{
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
true
);
}
}
else
{
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
fKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
fKey
);
}
}
tsdbTbDataIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
filterIter
++
;
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
}
#endif
}
if
(
pCols
&&
(
lastKey
!=
TSKEY_INITIAL_VAL
))
{
++
pCols
->
numOfRows
;
}
return
0
;
}
static
int32_t
tsdbGetOrCreateTbData
(
SMemTable
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
,
STbData
**
ppTbData
)
{
int32_t
code
=
0
;
int32_t
idx
=
0
;
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable2.c
已删除
100644 → 0
浏览文件 @
ea05f8af
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
typedef
struct
{
tb_uid_t
uid
;
STSchema
*
pTSchema
;
}
SSkmInfo
;
#define SL_MAX_LEVEL 5
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
#define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2
static
int32_t
tsdbGetOrCreateMemData
(
SMemTable2
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
,
SMemData
**
ppMemData
);
static
int
memDataPCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
static
int32_t
tPutTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
);
static
int32_t
tGetTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
);
static
int8_t
tsdbMemSkipListRandLevel
(
SMemSkipList
*
pSl
);
static
int32_t
tsdbInsertTableDataImpl
(
SMemTable2
*
pMemTable
,
SMemData
*
pMemData
,
int64_t
version
,
SVSubmitBlk
*
pSubmitBlk
);
static
void
memDataMovePosTo
(
SMemData
*
pMemData
,
SMemSkipListNode
**
pos
,
TSDBKEY
*
pKey
,
int32_t
flags
);
// SMemTable ==============================================
int32_t
tsdbMemTableCreate2
(
STsdb
*
pTsdb
,
SMemTable2
**
ppMemTable
)
{
int32_t
code
=
0
;
SMemTable2
*
pMemTable
=
NULL
;
pMemTable
=
(
SMemTable2
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pMemTable
));
if
(
pMemTable
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pMemTable
->
pTsdb
=
pTsdb
;
pMemTable
->
nRef
=
1
;
pMemTable
->
minKey
=
(
TSDBKEY
){.
version
=
INT64_MAX
,
.
ts
=
TSKEY_MAX
};
pMemTable
->
maxKey
=
(
TSDBKEY
){.
version
=
-
1
,
.
ts
=
TSKEY_MIN
};
pMemTable
->
nRows
=
0
;
pMemTable
->
nDelOp
=
0
;
pMemTable
->
aMemData
=
taosArrayInit
(
512
,
sizeof
(
SMemData
*
));
if
(
pMemTable
->
aMemData
==
NULL
)
{
taosMemoryFree
(
pMemTable
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
*
ppMemTable
=
pMemTable
;
return
code
;
_err:
*
ppMemTable
=
NULL
;
return
code
;
}
void
tsdbMemTableDestroy2
(
SMemTable2
*
pMemTable
)
{
taosArrayDestroyEx
(
pMemTable
->
aMemData
,
NULL
/*TODO*/
);
taosMemoryFree
(
pMemTable
);
}
int32_t
tsdbInsertTableData2
(
STsdb
*
pTsdb
,
int64_t
version
,
SVSubmitBlk
*
pSubmitBlk
)
{
int32_t
code
=
0
;
SMemTable2
*
pMemTable
=
(
SMemTable2
*
)
pTsdb
->
mem
;
// TODO
SMemData
*
pMemData
;
TSDBROW
row
=
{.
version
=
version
};
ASSERT
(
pMemTable
);
ASSERT
(
pSubmitBlk
->
nData
>
0
);
{
// check if table exists (todo)
}
code
=
tsdbGetOrCreateMemData
(
pMemTable
,
pSubmitBlk
->
suid
,
pSubmitBlk
->
uid
,
&
pMemData
);
if
(
code
)
{
tsdbError
(
"vgId:%d, failed to create/get table data since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
goto
_err
;
}
// do insert
code
=
tsdbInsertTableDataImpl
(
pMemTable
,
pMemData
,
version
,
pSubmitBlk
);
if
(
code
)
{
goto
_err
;
}
return
code
;
_err:
return
code
;
}
int32_t
tsdbDeleteTableData2
(
STsdb
*
pTsdb
,
int64_t
version
,
tb_uid_t
suid
,
tb_uid_t
uid
,
TSKEY
sKey
,
TSKEY
eKey
)
{
int32_t
code
=
0
;
SMemTable2
*
pMemTable
=
(
SMemTable2
*
)
pTsdb
->
mem
;
// TODO
SMemData
*
pMemData
;
SVBufPool
*
pPool
=
pTsdb
->
pVnode
->
inUse
;
ASSERT
(
pMemTable
);
{
// check if table exists (todo)
}
code
=
tsdbGetOrCreateMemData
(
pMemTable
,
suid
,
uid
,
&
pMemData
);
if
(
code
)
{
goto
_err
;
}
// do delete
SDelOp
*
pDelOp
=
(
SDelOp
*
)
vnodeBufPoolMalloc
(
pPool
,
sizeof
(
*
pDelOp
));
if
(
pDelOp
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pDelOp
->
version
=
version
;
pDelOp
->
sKey
=
sKey
;
pDelOp
->
eKey
=
eKey
;
pDelOp
->
pNext
=
NULL
;
if
(
pMemData
->
delOpHead
==
NULL
)
{
ASSERT
(
pMemData
->
delOpTail
==
NULL
);
pMemData
->
delOpHead
=
pMemData
->
delOpTail
=
pDelOp
;
}
else
{
pMemData
->
delOpTail
->
pNext
=
pDelOp
;
pMemData
->
delOpTail
=
pDelOp
;
}
{
// update the state of pMemTable, pMemData, last and lastrow (todo)
}
pMemTable
->
nDelOp
++
;
tsdbDebug
(
"vgId:%d, delete data from table suid:%"
PRId64
" uid:%"
PRId64
" sKey:%"
PRId64
" eKey:%"
PRId64
" since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
suid
,
uid
,
sKey
,
eKey
,
tstrerror
(
code
));
return
code
;
_err:
tsdbError
(
"vgId:%d, failed to delete data from table suid:%"
PRId64
" uid:%"
PRId64
" sKey:%"
PRId64
" eKey:%"
PRId64
" since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
suid
,
uid
,
sKey
,
eKey
,
tstrerror
(
code
));
return
code
;
}
void
tsdbMemDataIterOpen
(
SMemData
*
pMemData
,
TSDBKEY
*
pKey
,
int8_t
backward
,
SMemDataIter
*
pIter
)
{
SMemSkipListNode
*
pos
[
SL_MAX_LEVEL
];
pIter
->
pMemData
=
pMemData
;
pIter
->
backward
=
backward
;
pIter
->
pRow
=
NULL
;
if
(
pKey
==
NULL
)
{
// create from head or tail
if
(
backward
)
{
pIter
->
pNode
=
SL_NODE_BACKWARD
(
pMemData
->
sl
.
pTail
,
0
);
}
else
{
pIter
->
pNode
=
SL_NODE_FORWARD
(
pMemData
->
sl
.
pHead
,
0
);
}
}
else
{
// create from a key
if
(
backward
)
{
memDataMovePosTo
(
pMemData
,
pos
,
pKey
,
SL_MOVE_BACKWARD
);
pIter
->
pNode
=
SL_NODE_BACKWARD
(
pos
[
0
],
0
);
}
else
{
memDataMovePosTo
(
pMemData
,
pos
,
pKey
,
0
);
pIter
->
pNode
=
SL_NODE_FORWARD
(
pos
[
0
],
0
);
}
}
}
bool
tsdbMemDataIterNext
(
SMemDataIter
*
pIter
)
{
SMemSkipListNode
*
pHead
=
pIter
->
pMemData
->
sl
.
pHead
;
SMemSkipListNode
*
pTail
=
pIter
->
pMemData
->
sl
.
pTail
;
pIter
->
pRow
=
NULL
;
if
(
pIter
->
backward
)
{
ASSERT
(
pIter
->
pNode
!=
pTail
);
if
(
pIter
->
pNode
==
pHead
)
{
return
false
;
}
pIter
->
pNode
=
SL_NODE_BACKWARD
(
pIter
->
pNode
,
0
);
if
(
pIter
->
pNode
==
pHead
)
{
return
false
;
}
}
else
{
ASSERT
(
pIter
->
pNode
!=
pHead
);
if
(
pIter
->
pNode
==
pTail
)
{
return
false
;
}
pIter
->
pNode
=
SL_NODE_FORWARD
(
pIter
->
pNode
,
0
);
if
(
pIter
->
pNode
==
pTail
)
{
return
false
;
}
}
return
true
;
}
void
tsdbMemDataIterGet
(
SMemDataIter
*
pIter
,
TSDBROW
**
ppRow
)
{
if
(
pIter
->
pRow
)
{
*
ppRow
=
pIter
->
pRow
;
}
else
{
SMemSkipListNode
*
pHead
=
pIter
->
pMemData
->
sl
.
pHead
;
SMemSkipListNode
*
pTail
=
pIter
->
pMemData
->
sl
.
pTail
;
if
(
pIter
->
backward
)
{
ASSERT
(
pIter
->
pNode
!=
pTail
);
if
(
pIter
->
pNode
==
pHead
)
{
*
ppRow
=
NULL
;
}
else
{
tGetTSDBRow
((
uint8_t
*
)
SL_NODE_DATA
(
pIter
->
pNode
),
&
pIter
->
row
);
*
ppRow
=
&
pIter
->
row
;
}
}
else
{
ASSERT
(
pIter
->
pNode
!=
pHead
);
if
(
pIter
->
pNode
==
pTail
)
{
*
ppRow
=
NULL
;
}
else
{
tGetTSDBRow
((
uint8_t
*
)
SL_NODE_DATA
(
pIter
->
pNode
),
&
pIter
->
row
);
*
ppRow
=
&
pIter
->
row
;
}
}
}
}
static
int32_t
tsdbGetOrCreateMemData
(
SMemTable2
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
,
SMemData
**
ppMemData
)
{
int32_t
code
=
0
;
int32_t
idx
=
0
;
SMemData
*
pMemDataT
=
&
(
SMemData
){.
suid
=
suid
,
.
uid
=
uid
};
SMemData
*
pMemData
=
NULL
;
SVBufPool
*
pPool
=
pMemTable
->
pTsdb
->
pVnode
->
inUse
;
int8_t
maxLevel
=
pMemTable
->
pTsdb
->
pVnode
->
config
.
tsdbCfg
.
slLevel
;
// get
idx
=
taosArraySearchIdx
(
pMemTable
->
aMemData
,
&
pMemDataT
,
memDataPCmprFn
,
TD_GE
);
if
(
idx
>=
0
)
{
pMemData
=
(
SMemData
*
)
taosArrayGet
(
pMemTable
->
aMemData
,
idx
);
if
(
memDataPCmprFn
(
&
pMemDataT
,
&
pMemData
)
==
0
)
goto
_exit
;
}
// create
pMemData
=
vnodeBufPoolMalloc
(
pPool
,
sizeof
(
*
pMemData
)
+
SL_NODE_SIZE
(
maxLevel
)
*
2
);
if
(
pMemData
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pMemData
->
suid
=
suid
;
pMemData
->
uid
=
uid
;
pMemData
->
minKey
=
(
TSDBKEY
){.
version
=
INT64_MAX
,
.
ts
=
TSKEY_MAX
};
pMemData
->
maxKey
=
(
TSDBKEY
){.
version
=
-
1
,
.
ts
=
TSKEY_MIN
};
pMemData
->
delOpHead
=
pMemData
->
delOpTail
=
NULL
;
pMemData
->
sl
.
seed
=
taosRand
();
pMemData
->
sl
.
size
=
0
;
pMemData
->
sl
.
maxLevel
=
maxLevel
;
pMemData
->
sl
.
level
=
0
;
pMemData
->
sl
.
pHead
=
(
SMemSkipListNode
*
)
&
pMemData
[
1
];
pMemData
->
sl
.
pTail
=
(
SMemSkipListNode
*
)
POINTER_SHIFT
(
pMemData
->
sl
.
pHead
,
SL_NODE_SIZE
(
maxLevel
));
pMemData
->
sl
.
pHead
->
level
=
maxLevel
;
pMemData
->
sl
.
pTail
->
level
=
maxLevel
;
for
(
int8_t
iLevel
=
0
;
iLevel
<
pMemData
->
sl
.
maxLevel
;
iLevel
++
)
{
SL_NODE_FORWARD
(
pMemData
->
sl
.
pHead
,
iLevel
)
=
pMemData
->
sl
.
pTail
;
SL_NODE_BACKWARD
(
pMemData
->
sl
.
pHead
,
iLevel
)
=
NULL
;
SL_NODE_BACKWARD
(
pMemData
->
sl
.
pTail
,
iLevel
)
=
pMemData
->
sl
.
pHead
;
SL_NODE_FORWARD
(
pMemData
->
sl
.
pTail
,
iLevel
)
=
NULL
;
}
if
(
idx
<
0
)
idx
=
0
;
if
(
taosArrayInsert
(
pMemTable
->
aMemData
,
idx
,
&
pMemData
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
_exit:
*
ppMemData
=
pMemData
;
return
code
;
_err:
*
ppMemData
=
NULL
;
return
code
;
}
static
int
memDataPCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
SMemData
*
pMemData1
=
*
(
SMemData
**
)
p1
;
SMemData
*
pMemData2
=
*
(
SMemData
**
)
p2
;
if
(
pMemData1
->
suid
<
pMemData2
->
suid
)
{
return
-
1
;
}
else
if
(
pMemData1
->
suid
>
pMemData2
->
suid
)
{
return
1
;
}
if
(
pMemData1
->
uid
<
pMemData2
->
uid
)
{
return
-
1
;
}
else
if
(
pMemData1
->
uid
>
pMemData2
->
uid
)
{
return
1
;
}
return
0
;
}
static
int32_t
tPutTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
)
{
int32_t
n
=
0
;
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pRow
->
version
);
n
+=
tPutTSRow
(
p
?
p
+
n
:
p
,
&
pRow
->
tsRow
);
return
n
;
}
static
int32_t
tGetTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
)
{
int32_t
n
=
0
;
n
+=
tGetI64
(
p
+
n
,
&
pRow
->
version
);
n
+=
tGetTSRow
(
p
+
n
,
&
pRow
->
tsRow
);
return
n
;
}
static
FORCE_INLINE
int8_t
tsdbMemSkipListRandLevel
(
SMemSkipList
*
pSl
)
{
int8_t
level
=
1
;
int8_t
tlevel
=
TMIN
(
pSl
->
maxLevel
,
pSl
->
level
+
1
);
const
uint32_t
factor
=
4
;
while
((
taosRandR
(
&
pSl
->
seed
)
%
factor
)
==
0
&&
level
<
tlevel
)
{
level
++
;
}
return
level
;
}
static
void
memDataMovePosTo
(
SMemData
*
pMemData
,
SMemSkipListNode
**
pos
,
TSDBKEY
*
pKey
,
int32_t
flags
)
{
SMemSkipListNode
*
px
;
SMemSkipListNode
*
pn
;
TSDBKEY
*
pTKey
;
int
c
;
int
backward
=
flags
&
SL_MOVE_BACKWARD
;
int
fromPos
=
flags
&
SL_MOVE_FROM_POS
;
if
(
backward
)
{
px
=
pMemData
->
sl
.
pTail
;
for
(
int8_t
iLevel
=
pMemData
->
sl
.
maxLevel
-
1
;
iLevel
>=
pMemData
->
sl
.
level
;
iLevel
--
)
{
pos
[
iLevel
]
=
px
;
}
if
(
pMemData
->
sl
.
level
)
{
if
(
fromPos
)
px
=
pos
[
pMemData
->
sl
.
level
-
1
];
for
(
int8_t
iLevel
=
pMemData
->
sl
.
level
-
1
;
iLevel
>=
0
;
iLevel
--
)
{
pn
=
SL_NODE_BACKWARD
(
px
,
iLevel
);
while
(
pn
!=
pMemData
->
sl
.
pHead
)
{
pTKey
=
(
TSDBKEY
*
)
SL_NODE_DATA
(
pn
);
c
=
tsdbKeyCmprFn
(
pTKey
,
pKey
);
if
(
c
<=
0
)
{
break
;
}
else
{
px
=
pn
;
pn
=
SL_NODE_BACKWARD
(
px
,
iLevel
);
}
}
pos
[
iLevel
]
=
px
;
}
}
}
else
{
px
=
pMemData
->
sl
.
pHead
;
for
(
int8_t
iLevel
=
pMemData
->
sl
.
maxLevel
-
1
;
iLevel
>=
pMemData
->
sl
.
level
;
iLevel
--
)
{
pos
[
iLevel
]
=
px
;
}
if
(
pMemData
->
sl
.
level
)
{
if
(
fromPos
)
px
=
pos
[
pMemData
->
sl
.
level
-
1
];
for
(
int8_t
iLevel
=
pMemData
->
sl
.
level
-
1
;
iLevel
>=
0
;
iLevel
--
)
{
pn
=
SL_NODE_FORWARD
(
px
,
iLevel
);
while
(
pn
!=
pMemData
->
sl
.
pHead
)
{
pTKey
=
(
TSDBKEY
*
)
SL_NODE_DATA
(
pn
);
c
=
tsdbKeyCmprFn
(
pTKey
,
pKey
);
if
(
c
>=
0
)
{
break
;
}
else
{
px
=
pn
;
pn
=
SL_NODE_FORWARD
(
px
,
iLevel
);
}
}
pos
[
iLevel
]
=
px
;
}
}
}
}
static
int32_t
memDataDoPut
(
SMemTable2
*
pMemTable
,
SMemData
*
pMemData
,
SMemSkipListNode
**
pos
,
TSDBROW
*
pRow
,
int8_t
forward
)
{
int32_t
code
=
0
;
int8_t
level
;
SMemSkipListNode
*
pNode
;
SVBufPool
*
pPool
=
pMemTable
->
pTsdb
->
pVnode
->
inUse
;
// node
level
=
tsdbMemSkipListRandLevel
(
&
pMemData
->
sl
);
pNode
=
(
SMemSkipListNode
*
)
vnodeBufPoolMalloc
(
pPool
,
SL_NODE_SIZE
(
level
)
+
tPutTSDBRow
(
NULL
,
pRow
));
if
(
pNode
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
pNode
->
level
=
level
;
for
(
int8_t
iLevel
=
0
;
iLevel
<
level
;
iLevel
++
)
{
SL_NODE_FORWARD
(
pNode
,
iLevel
)
=
NULL
;
SL_NODE_BACKWARD
(
pNode
,
iLevel
)
=
NULL
;
}
tPutTSDBRow
((
uint8_t
*
)
SL_NODE_DATA
(
pNode
),
pRow
);
// put
for
(
int8_t
iLevel
=
0
;
iLevel
<
pNode
->
level
;
iLevel
++
)
{
SMemSkipListNode
*
px
=
pos
[
iLevel
];
if
(
forward
)
{
SMemSkipListNode
*
pNext
=
SL_NODE_FORWARD
(
px
,
iLevel
);
SL_NODE_FORWARD
(
pNode
,
iLevel
)
=
pNext
;
SL_NODE_BACKWARD
(
pNode
,
iLevel
)
=
px
;
SL_NODE_BACKWARD
(
pNext
,
iLevel
)
=
pNode
;
SL_NODE_FORWARD
(
px
,
iLevel
)
=
pNode
;
}
else
{
SMemSkipListNode
*
pPrev
=
SL_NODE_BACKWARD
(
px
,
iLevel
);
SL_NODE_FORWARD
(
pNode
,
iLevel
)
=
px
;
SL_NODE_BACKWARD
(
pNode
,
iLevel
)
=
pPrev
;
SL_NODE_FORWARD
(
pPrev
,
iLevel
)
=
pNode
;
SL_NODE_BACKWARD
(
px
,
iLevel
)
=
pNode
;
}
}
pMemData
->
sl
.
size
++
;
if
(
pMemData
->
sl
.
level
<
pNode
->
level
)
{
pMemData
->
sl
.
level
=
pNode
->
level
;
}
_exit:
return
code
;
}
static
int32_t
tsdbInsertTableDataImpl
(
SMemTable2
*
pMemTable
,
SMemData
*
pMemData
,
int64_t
version
,
SVSubmitBlk
*
pSubmitBlk
)
{
int32_t
code
=
0
;
int32_t
n
=
0
;
uint8_t
*
p
=
pSubmitBlk
->
pData
;
int32_t
nRow
=
0
;
TSDBROW
row
=
{.
version
=
version
};
SMemSkipListNode
*
pos
[
SL_MAX_LEVEL
];
ASSERT
(
pSubmitBlk
->
nData
);
// backward put first data
n
+=
tGetTSRow
(
p
+
n
,
&
row
.
tsRow
);
ASSERT
(
n
<=
pSubmitBlk
->
nData
);
memDataMovePosTo
(
pMemData
,
pos
,
&
(
TSDBKEY
){.
version
=
version
,
.
ts
=
row
.
tsRow
.
ts
},
SL_MOVE_BACKWARD
);
code
=
memDataDoPut
(
pMemTable
,
pMemData
,
pos
,
&
row
,
0
);
if
(
code
)
{
goto
_exit
;
}
nRow
++
;
if
(
tsdbKeyCmprFn
((
TSDBKEY
*
)
&
row
,
&
pMemData
->
minKey
)
<
0
)
{
pMemData
->
minKey
=
*
(
TSDBKEY
*
)
&
row
;
}
if
(
tsdbKeyCmprFn
((
TSDBKEY
*
)
&
row
,
&
pMemTable
->
minKey
)
<
0
)
{
pMemTable
->
minKey
=
*
(
TSDBKEY
*
)
&
row
;
}
// forward put rest
for
(
int8_t
iLevel
=
0
;
iLevel
<
pMemData
->
sl
.
maxLevel
;
iLevel
++
)
{
pos
[
iLevel
]
=
SL_NODE_BACKWARD
(
pos
[
iLevel
],
iLevel
);
}
while
(
n
<
pSubmitBlk
->
nData
)
{
n
+=
tGetTSRow
(
p
+
n
,
&
row
.
tsRow
);
ASSERT
(
n
<=
pSubmitBlk
->
nData
);
memDataMovePosTo
(
pMemData
,
pos
,
&
(
TSDBKEY
){.
version
=
version
,
.
ts
=
row
.
tsRow
.
ts
},
SL_MOVE_FROM_POS
);
code
=
memDataDoPut
(
pMemTable
,
pMemData
,
pos
,
&
row
,
1
);
if
(
code
)
{
goto
_exit
;
}
nRow
++
;
}
if
(
tsdbKeyCmprFn
((
TSDBKEY
*
)
&
row
,
&
pMemData
->
maxKey
)
>
0
)
{
pMemData
->
maxKey
=
*
(
TSDBKEY
*
)
&
row
;
}
if
(
tsdbKeyCmprFn
((
TSDBKEY
*
)
&
row
,
&
pMemTable
->
maxKey
)
>
0
)
{
pMemTable
->
maxKey
=
*
(
TSDBKEY
*
)
&
row
;
}
pMemTable
->
nRows
+=
nRow
;
_exit:
return
code
;
}
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
浏览文件 @
ca2d1f8b
...
...
@@ -196,33 +196,6 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
}
else
{
pReadh
->
pBlkIdx
=
(
SBlockIdx
*
)
p
;
}
// size_t size = taosArrayGetSize(pReadh->aBlkIdx);
// if (size > 0) {
// while (true) {
// if (pReadh->cidx >= size) {
// pReadh->pBlkIdx = NULL;
// break;
// }
// SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
// if (pBlkIdx->uid == TABLE_TID(pTable)) {
// if (pBlkIdx->uid == TABLE_UID(pTable)) {
// pReadh->pBlkIdx = pBlkIdx;
// } else {
// pReadh->pBlkIdx = NULL;
// }
// pReadh->cidx++;
// break;
// } else if (pBlkIdx->uid > TABLE_TID(pTable)) {
// pReadh->pBlkIdx = NULL;
// break;
// } else {
// pReadh->cidx++;
// }
// }
// } else {
// pReadh->pBlkIdx = NULL;
// }
return
0
;
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
ca2d1f8b
...
...
@@ -1227,7 +1227,10 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData, int
SET_RES_WINDOW_KEY
(
pSup
->
keyBuf
,
pData
,
bytes
,
groupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
taosHashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
ASSERT
(
p1
);
if
(
!
p1
)
{
// window has been closed
return
;
}
doClearWindowImpl
(
p1
,
pSup
->
pResultBuf
,
pBinfo
,
numOfOutput
);
}
...
...
@@ -2202,12 +2205,12 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
}
}
int32_t
initBiasicInfo
(
SOptrBasicInfo
*
pBasicInfo
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SDiskbasedBuf
*
pResultBuf
)
{
int32_t
initBiasicInfo
(
SOptrBasicInfo
*
pBasicInfo
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
)
{
pBasicInfo
->
pCtx
=
createSqlFunctionCtx
(
pExprInfo
,
numOfCols
,
&
pBasicInfo
->
rowCellInfoOffset
);
pBasicInfo
->
pRes
=
pResultBlock
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
pBasicInfo
->
pCtx
[
i
].
pBuf
=
pResultBuf
;
pBasicInfo
->
pCtx
[
i
].
pBuf
=
NULL
;
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -2237,16 +2240,15 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx
initResultSizeInfo
(
pOperator
,
4096
);
code
=
init
SessionAggSupporter
(
&
pInfo
->
streamAggSup
,
"StreamSessionAggOperatorInfo"
);
code
=
init
BiasicInfo
(
&
pInfo
->
binfo
,
pExprInfo
,
numOfCols
,
pResBlock
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
code
=
init
BiasicInfo
(
&
pInfo
->
binfo
,
pExprInfo
,
numOfCols
,
pResBlock
,
pInfo
->
streamAggSup
.
pResultBuf
);
pInfo
->
streamAggSup
.
resultRowSize
=
getResultRowSize
(
pInfo
->
binfo
.
pCtx
,
numOfCols
);
code
=
init
SessionAggSupporter
(
&
pInfo
->
streamAggSup
,
"StreamSessionAggOperatorInfo"
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pInfo
->
streamAggSup
.
resultRowSize
=
getResultRowSize
(
pInfo
->
binfo
.
pCtx
,
numOfCols
);
pInfo
->
pDummyCtx
=
(
SqlFunctionCtx
*
)
taosMemoryCalloc
(
numOfCols
,
sizeof
(
SqlFunctionCtx
));
if
(
pInfo
->
pDummyCtx
==
NULL
)
{
...
...
@@ -3101,6 +3103,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
int32_t
tsSlotId
=
((
SColumnNode
*
)
pStateNode
->
window
.
pTspk
)
->
slotId
;
SColumnNode
*
pColNode
=
(
SColumnNode
*
)((
STargetNode
*
)
pStateNode
->
pStateKey
)
->
pExpr
;
int32_t
code
=
TSDB_CODE_OUT_OF_MEMORY
;
SStreamStateAggOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateAggOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
...
...
@@ -3121,17 +3124,18 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
.
winMap
=
NULL
,
};
initExecTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pTaskInfo
->
window
);
int32_t
code
=
initStateAggSupporter
(
&
pInfo
->
streamAggSup
,
"StreamStateAggOperatorInfo"
);
code
=
initBiasicInfo
(
&
pInfo
->
binfo
,
pExprInfo
,
numOfCols
,
pResBlock
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
code
=
initBiasicInfo
(
&
pInfo
->
binfo
,
pExprInfo
,
numOfCols
,
pResBlock
,
pInfo
->
streamAggSup
.
pResultBuf
);
pInfo
->
streamAggSup
.
resultRowSize
=
getResultRowSize
(
pInfo
->
binfo
.
pCtx
,
numOfCols
);
code
=
initStateAggSupporter
(
&
pInfo
->
streamAggSup
,
"StreamStateAggOperatorInfo"
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pInfo
->
streamAggSup
.
resultRowSize
=
getResultRowSize
(
pInfo
->
binfo
.
pCtx
,
numOfCols
);
pInfo
->
pDummyCtx
=
(
SqlFunctionCtx
*
)
taosMemoryCalloc
(
numOfCols
,
sizeof
(
SqlFunctionCtx
));
if
(
pInfo
->
pDummyCtx
==
NULL
)
{
goto
_error
;
...
...
source/libs/executor/src/tsort.c
浏览文件 @
ca2d1f8b
...
...
@@ -359,6 +359,11 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
SSDataBlock
*
pLeftBlock
=
pLeftSource
->
src
.
pBlock
;
SSDataBlock
*
pRightBlock
=
pRightSource
->
src
.
pBlock
;
// first sort by block groupId
if
(
pLeftBlock
->
info
.
groupId
!=
pRightBlock
->
info
.
groupId
)
{
return
pLeftBlock
->
info
.
groupId
<
pRightBlock
->
info
.
groupId
?
-
1
:
1
;
}
for
(
int32_t
i
=
0
;
i
<
pInfo
->
size
;
++
i
)
{
SBlockOrderInfo
*
pOrder
=
TARRAY_GET_ELEM
(
pInfo
,
i
);
SColumnInfoData
*
pLeftColInfoData
=
TARRAY_GET_ELEM
(
pLeftBlock
->
pDataBlock
,
pOrder
->
slotId
);
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
ca2d1f8b
...
...
@@ -84,6 +84,7 @@ int32_t apercentileFunction(SqlFunctionCtx *pCtx);
int32_t
apercentileFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
int32_t
apercentileFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int32_t
apercentilePartialFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int32_t
apercentileCombine
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
);
int32_t
getApercentileMaxSize
();
bool
getDiffFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
...
...
source/libs/function/src/builtins.c
浏览文件 @
ca2d1f8b
...
...
@@ -1293,7 +1293,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
getEnvFunc
=
getPercentileFuncEnv
,
.
initFunc
=
percentileFunctionSetup
,
.
processFunc
=
percentileFunction
,
.
finalizeFunc
=
percentileFinalize
.
finalizeFunc
=
percentileFinalize
,
.
invertFunc
=
NULL
,
.
combineFunc
=
NULL
,
},
{
.
name
=
"apercentile"
,
...
...
@@ -1304,6 +1306,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
initFunc
=
apercentileFunctionSetup
,
.
processFunc
=
apercentileFunction
,
.
finalizeFunc
=
apercentileFinalize
,
.
combineFunc
=
apercentileCombine
,
.
pPartialFunc
=
"_apercentile_partial"
,
.
pMergeFunc
=
"_apercentile_merge"
},
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
ca2d1f8b
...
...
@@ -2215,6 +2215,26 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return
pResInfo
->
numOfRes
;
}
int32_t
apercentileCombine
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
)
{
SResultRowEntryInfo
*
pDResInfo
=
GET_RES_INFO
(
pDestCtx
);
SAPercentileInfo
*
pDBuf
=
GET_ROWCELL_INTERBUF
(
pDResInfo
);
int32_t
type
=
pDestCtx
->
input
.
pData
[
0
]
->
info
.
type
;
SResultRowEntryInfo
*
pSResInfo
=
GET_RES_INFO
(
pSourceCtx
);
SAPercentileInfo
*
pSBuf
=
GET_ROWCELL_INTERBUF
(
pSResInfo
);
ASSERT
(
pDBuf
->
algo
==
pSBuf
->
algo
);
if
(
pDBuf
->
algo
==
APERCT_ALGO_TDIGEST
)
{
tdigestMerge
(
pDBuf
->
pTDigest
,
pSBuf
->
pTDigest
);
}
else
{
SHistogramInfo
*
pTmp
=
tHistogramMerge
(
pDBuf
->
pHisto
,
pSBuf
->
pHisto
,
MAX_HISTOGRAM_BIN
);
memcpy
(
pDBuf
->
pHisto
,
pTmp
,
sizeof
(
SHistogramInfo
)
+
sizeof
(
SHistBin
)
*
(
MAX_HISTOGRAM_BIN
+
1
));
pDBuf
->
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pDBuf
->
pHisto
+
sizeof
(
SHistogramInfo
));
tHistogramDestroy
(
&
pTmp
);
}
pDResInfo
->
numOfRes
=
TMAX
(
pDResInfo
->
numOfRes
,
pSResInfo
->
numOfRes
);
return
TSDB_CODE_SUCCESS
;
}
bool
getFirstLastFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
SColumnNode
*
pNode
=
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
pEnv
->
calcMemSize
=
pNode
->
node
.
resType
.
bytes
+
sizeof
(
int64_t
);
...
...
source/util/src/talgo.c
浏览文件 @
ca2d1f8b
...
...
@@ -158,82 +158,48 @@ void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __
taosMemoryFreeClear
(
buf
);
}
void
*
taosbsearch
(
const
void
*
key
,
const
void
*
base
,
int64_t
nmemb
,
int64_t
size
,
__compar_fn_t
compar
,
int32_t
flags
)
{
// TODO: need to check the correctness of this function
int32_t
l
=
0
;
int32_t
r
=
(
int32_t
)
nmemb
;
int32_t
idx
=
0
;
int32_t
comparison
;
void
*
taosbsearch
(
const
void
*
key
,
const
void
*
base
,
int32_t
nmemb
,
int32_t
size
,
__compar_fn_t
compar
,
int32_t
flags
)
{
uint8_t
*
p
;
int32_t
lidx
;
int32_t
ridx
;
int32_t
midx
;
int32_t
c
;
if
(
nmemb
<=
0
)
return
NULL
;
lidx
=
0
;
ridx
=
nmemb
-
1
;
while
(
lidx
<=
ridx
)
{
midx
=
(
lidx
+
ridx
)
/
2
;
p
=
(
uint8_t
*
)
base
+
size
*
midx
;
c
=
compar
(
key
,
p
);
if
(
c
==
0
)
{
break
;
}
else
if
(
c
<
0
)
{
ridx
=
midx
-
1
;
}
else
{
lidx
=
midx
+
1
;
}
}
if
(
flags
==
TD_EQ
)
{
return
bsearch
(
key
,
base
,
nmemb
,
size
,
compar
)
;
return
c
?
NULL
:
p
;
}
else
if
(
flags
==
TD_GE
)
{
if
(
nmemb
<=
0
)
return
NULL
;
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
0
))
<=
0
)
return
elePtrAt
(
base
,
size
,
0
);
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
nmemb
-
1
))
>
0
)
return
NULL
;
while
(
l
<
r
)
{
idx
=
(
l
+
r
)
/
2
;
comparison
=
(
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
idx
));
if
(
comparison
<
0
)
{
r
=
idx
;
}
else
if
(
comparison
>
0
)
{
l
=
idx
+
1
;
}
else
{
return
elePtrAt
(
base
,
size
,
idx
);
}
}
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
idx
))
<
0
)
{
return
elePtrAt
(
base
,
size
,
idx
);
}
else
{
if
(
idx
+
1
>
nmemb
-
1
)
{
return
NULL
;
}
else
{
return
elePtrAt
(
base
,
size
,
idx
+
1
);
}
}
return
(
c
<=
0
)
?
p
:
(
midx
+
1
<
nmemb
?
p
+
size
:
NULL
);
}
else
if
(
flags
==
TD_LE
)
{
if
(
nmemb
<=
0
)
return
NULL
;
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
nmemb
-
1
))
>=
0
)
return
elePtrAt
(
base
,
size
,
nmemb
-
1
);
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
0
))
<
0
)
return
NULL
;
while
(
l
<
r
)
{
idx
=
(
l
+
r
)
/
2
;
comparison
=
(
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
idx
));
if
(
comparison
<
0
)
{
r
=
idx
;
}
else
if
(
comparison
>
0
)
{
l
=
idx
+
1
;
}
else
{
return
elePtrAt
(
base
,
size
,
idx
);
}
}
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
idx
))
>
0
)
{
return
elePtrAt
(
base
,
size
,
idx
);
}
else
{
if
(
idx
==
0
)
{
return
NULL
;
}
else
{
return
elePtrAt
(
base
,
size
,
idx
-
1
);
}
}
return
(
c
>=
0
)
?
p
:
(
midx
>
0
?
p
-
size
:
NULL
);
}
else
{
assert
(
0
);
return
NULL
;
ASSERT
(
0
);
}
return
NULL
;
}
void
taosheapadjust
(
void
*
base
,
int32_t
size
,
int32_t
start
,
int32_t
end
,
const
void
*
parcompar
,
__ext_compar_fn_t
compar
,
char
*
buf
,
bool
maxroot
)
{
__ext_compar_fn_t
compar
,
char
*
buf
,
bool
maxroot
)
{
int32_t
parent
;
int32_t
child
;
char
*
tmp
=
NULL
;
char
*
tmp
=
NULL
;
if
(
buf
==
NULL
)
{
tmp
=
taosMemoryMalloc
(
size
);
}
else
{
...
...
@@ -288,7 +254,7 @@ void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar,
bool
maxroot
)
{
int32_t
i
;
char
*
buf
=
taosMemoryCalloc
(
1
,
size
);
char
*
buf
=
taosMemoryCalloc
(
1
,
size
);
if
(
buf
==
NULL
)
{
return
;
}
...
...
source/util/test/CMakeLists.txt
浏览文件 @
ca2d1f8b
...
...
@@ -67,4 +67,12 @@ target_link_libraries(bloomFilterTest os util gtest_main)
add_test
(
NAME bloomFilterTest
COMMAND bloomFilterTest
)
# taosbsearchTest
add_executable
(
taosbsearchTest
"taosbsearchTest.cpp"
)
target_link_libraries
(
taosbsearchTest os util gtest_main
)
add_test
(
NAME taosbsearchTest
COMMAND taosbsearchTest
)
\ No newline at end of file
tests/test/c/tmqSim.c
浏览文件 @
ca2d1f8b
...
...
@@ -514,14 +514,14 @@ void* consumeThreadFunc(void* param) {
err
=
tmq_unsubscribe
(
pInfo
->
tmq
);
if
(
err
)
{
pError
(
"tmq_unsubscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
pInfo
->
consumeMsgCnt
=
-
1
;
return
NULL
;
/*pInfo->consumeMsgCnt = -1;*/
/*return NULL;*/
}
err
=
tmq_consumer_close
(
pInfo
->
tmq
);
if
(
err
)
{
pError
(
"tmq_consumer_close() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
/*exit(-1);*/
}
pInfo
->
tmq
=
NULL
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录