Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e05ef9a8
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e05ef9a8
编写于
5月 11, 2022
作者:
S
Shengliang
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/check
上级
d94c8df3
61354eca
变更
24
隐藏空白更改
内联
并排
Showing
24 changed file
with
701 addition
and
142 deletion
+701
-142
cmake/addr2line_CMakeLists.txt.in
cmake/addr2line_CMakeLists.txt.in
+12
-0
cmake/cmake.options
cmake/cmake.options
+6
-0
cmake/libdwarf_CMakeLists.txt.in
cmake/libdwarf_CMakeLists.txt.in
+12
-0
contrib/CMakeLists.txt
contrib/CMakeLists.txt
+48
-1
include/common/tmsg.h
include/common/tmsg.h
+1
-1
include/os/osMemory.h
include/os/osMemory.h
+1
-0
source/client/src/tmq.c
source/client/src/tmq.c
+12
-1
source/common/src/systable.c
source/common/src/systable.c
+2
-2
source/dnode/mnode/impl/inc/mndConsumer.h
source/dnode/mnode/impl/inc/mndConsumer.h
+1
-0
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+17
-5
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+14
-7
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+5
-1
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+76
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+3
-3
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+0
-1
source/libs/executor/test/executorTests.cpp
source/libs/executor/test/executorTests.cpp
+61
-57
source/libs/index/src/indexCache.c
source/libs/index/src/indexCache.c
+24
-22
source/libs/index/src/indexTfile.c
source/libs/index/src/indexTfile.c
+39
-18
source/libs/index/test/fstTest.cc
source/libs/index/test/fstTest.cc
+7
-8
source/libs/index/test/jsonUT.cc
source/libs/index/test/jsonUT.cc
+237
-0
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+2
-2
source/os/CMakeLists.txt
source/os/CMakeLists.txt
+13
-3
source/os/src/osMemory.c
source/os/src/osMemory.c
+101
-3
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+7
-6
未找到文件。
cmake/addr2line_CMakeLists.txt.in
0 → 100644
浏览文件 @
e05ef9a8
# addr2line
ExternalProject_Add(addr2line
GIT_REPOSITORY https://github.com/davea42/libdwarf-addr2line.git
GIT_TAG master
SOURCE_DIR "${TD_CONTRIB_DIR}/addr2line"
BINARY_DIR "${TD_CONTRIB_DIR}/addr2line"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
cmake/cmake.options
浏览文件 @
e05ef9a8
...
...
@@ -48,6 +48,12 @@ IF(${TD_WINDOWS})
ENDIF ()
option(
BUILD_ADDR2LINE
"If build addr2line"
OFF
)
option(
BUILD_TEST
"If build unit tests using googletest"
...
...
cmake/libdwarf_CMakeLists.txt.in
0 → 100644
浏览文件 @
e05ef9a8
# libdwarf
ExternalProject_Add(libdwarf
GIT_REPOSITORY https://github.com/davea42/libdwarf-code.git
GIT_TAG libdwarf-0.3.1
SOURCE_DIR "${TD_CONTRIB_DIR}/libdwarf"
BINARY_DIR "${TD_CONTRIB_DIR}/libdwarf"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
contrib/CMakeLists.txt
浏览文件 @
e05ef9a8
...
...
@@ -98,6 +98,12 @@ if(${BUILD_WITH_NURAFT})
cat
(
"
${
TD_SUPPORT_DIR
}
/nuraft_CMakeLists.txt.in"
${
CONTRIB_TMP_FILE
}
)
endif
(
${
BUILD_WITH_NURAFT
}
)
# addr2line
if
(
${
BUILD_ADDR2LINE
}
)
cat
(
"
${
TD_SUPPORT_DIR
}
/libdwarf_CMakeLists.txt.in"
${
CONTRIB_TMP_FILE
}
)
cat
(
"
${
TD_SUPPORT_DIR
}
/addr2line_CMakeLists.txt.in"
${
CONTRIB_TMP_FILE
}
)
endif
(
${
BUILD_ADDR2LINE
}
)
# download dependencies
configure_file
(
${
CONTRIB_TMP_FILE
}
"
${
TD_CONTRIB_DIR
}
/deps-download/CMakeLists.txt"
)
execute_process
(
COMMAND
"
${
CMAKE_COMMAND
}
"
-G
"
${
CMAKE_GENERATOR
}
"
.
...
...
@@ -327,7 +333,48 @@ if(${BUILD_WITH_SQLITE})
endif
(
NOT TD_WINDOWS
)
endif
(
${
BUILD_WITH_SQLITE
}
)
# pthread
# addr2line
if
(
${
BUILD_ADDR2LINE
}
)
check_include_file
(
"sys/types.h"
HAVE_SYS_TYPES_H
)
check_include_file
(
"sys/stat.h"
HAVE_SYS_STAT_H
)
check_include_file
(
"inttypes.h"
HAVE_INTTYPES_H
)
check_include_file
(
"stddef.h"
HAVE_STDDEF_H
)
check_include_file
(
"stdlib.h"
HAVE_STDLIB_H
)
check_include_file
(
"string.h"
HAVE_STRING_H
)
check_include_file
(
"memory.h"
HAVE_MEMORY_H
)
check_include_file
(
"strings.h"
HAVE_STRINGS_H
)
check_include_file
(
"stdint.h"
HAVE_STDINT_H
)
check_include_file
(
"unistd.h"
HAVE_UNISTD_H
)
check_include_file
(
"sgidefs.h"
HAVE_SGIDEFS_H
)
check_include_file
(
"stdafx.h"
HAVE_STDAFX_H
)
check_include_file
(
"elf.h"
HAVE_ELF_H
)
check_include_file
(
"libelf.h"
HAVE_LIBELF_H
)
check_include_file
(
"libelf/libelf.h"
HAVE_LIBELF_LIBELF_H
)
check_include_file
(
"alloca.h"
HAVE_ALLOCA_H
)
check_include_file
(
"elfaccess.h"
HAVE_ELFACCESS_H
)
check_include_file
(
"sys/elf_386.h"
HAVE_SYS_ELF_386_H
)
check_include_file
(
"sys/elf_amd64.h"
HAVE_SYS_ELF_AMD64_H
)
check_include_file
(
"sys/elf_sparc.h"
HAVE_SYS_ELF_SPARC_H
)
check_include_file
(
"sys/ia64/elf.h"
HAVE_SYS_IA64_ELF_H
)
set
(
VERSION 0.3.1
)
set
(
PACKAGE_VERSION
"
\"
${
VERSION
}
\"
"
)
configure_file
(
libdwarf/cmake/config.h.cmake config.h
)
file
(
GLOB_RECURSE LIBDWARF_SOURCES
"libdwarf/src/lib/libdwarf/*.c"
)
add_library
(
libdwarf STATIC
${
LIBDWARF_SOURCES
}
)
set_target_properties
(
libdwarf PROPERTIES OUTPUT_NAME
"libdwarf"
)
if
(
HAVE_LIBELF_H OR HAVE_LIBELF_LIBELF_H
)
target_link_libraries
(
libdwarf PUBLIC libelf
)
endif
()
target_include_directories
(
libdwarf SYSTEM PUBLIC
"libdwarf/src/lib/libdwarf"
${
CMAKE_BINARY_DIR
}
/contrib
)
file
(
READ
"addr2line/addr2line.c"
ADDR2LINE_CONTENT
)
string
(
REPLACE
"static int"
"int"
ADDR2LINE_CONTENT
"
${
ADDR2LINE_CONTENT
}
"
)
string
(
REPLACE
"static void"
"void"
ADDR2LINE_CONTENT
"
${
ADDR2LINE_CONTENT
}
"
)
string
(
REPLACE
"main("
"main_addr2line("
ADDR2LINE_CONTENT
"
${
ADDR2LINE_CONTENT
}
"
)
file
(
WRITE
"addr2line/addr2line.c"
"
${
ADDR2LINE_CONTENT
}
"
)
add_library
(
addr2line STATIC
"addr2line/addr2line.c"
)
target_link_libraries
(
addr2line PUBLIC libdwarf dl z
)
target_include_directories
(
addr2line PUBLIC
"libdwarf/src/lib/libdwarf"
)
endif
(
${
BUILD_ADDR2LINE
}
)
# ================================================================================================
...
...
include/common/tmsg.h
浏览文件 @
e05ef9a8
...
...
@@ -1493,7 +1493,7 @@ typedef struct {
}
SMVSubscribeRsp
;
typedef
struct
{
char
name
[
TSDB_T
ABLE
_FNAME_LEN
];
char
name
[
TSDB_T
OPIC
_FNAME_LEN
];
int8_t
igNotExists
;
}
SMDropTopicReq
;
...
...
include/os/osMemory.h
浏览文件 @
e05ef9a8
...
...
@@ -35,6 +35,7 @@ void *taosMemoryRealloc(void *ptr, int32_t size);
void
*
taosMemoryStrDup
(
void
*
ptr
);
void
taosMemoryFree
(
void
*
ptr
);
int32_t
taosMemorySize
(
void
*
ptr
);
void
taosPrintBackTrace
();
#define taosMemoryFreeClear(ptr) \
do { \
...
...
source/client/src/tmq.c
浏览文件 @
e05ef9a8
...
...
@@ -1307,7 +1307,18 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
}
tmq_resp_err_t
tmq_consumer_close
(
tmq_t
*
tmq
)
{
// TODO
if
(
tmq
->
status
==
TMQ_CONSUMER_STATUS__READY
)
{
tmq_list_t
*
lst
=
tmq_list_new
();
tmq_resp_err_t
rsp
=
tmq_subscribe
(
tmq
,
lst
);
tmq_list_destroy
(
lst
);
if
(
rsp
==
TMQ_RESP_ERR__SUCCESS
)
{
// TODO: free resources
return
TMQ_RESP_ERR__SUCCESS
;
}
else
{
return
TMQ_RESP_ERR__FAIL
;
}
}
// TODO: free resources
return
TMQ_RESP_ERR__SUCCESS
;
}
...
...
source/common/src/systable.c
浏览文件 @
e05ef9a8
...
...
@@ -262,7 +262,7 @@ static const SSysDbTableSchema topicSchema[] = {
static
const
SSysDbTableSchema
consumerSchema
[]
=
{
{.
name
=
"consumer_id"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
},
{.
name
=
"
group_id
"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"
consumer_group
"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"app_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"status"
,
.
bytes
=
20
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"topics"
,
.
bytes
=
TSDB_TOPIC_FNAME_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
...
...
@@ -275,7 +275,7 @@ static const SSysDbTableSchema consumerSchema[] = {
static
const
SSysDbTableSchema
subscriptionSchema
[]
=
{
{.
name
=
"topic_name"
,
.
bytes
=
TSDB_TOPIC_FNAME_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"
group_id
"
,
.
bytes
=
TSDB_CGROUP_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"
consumer_group
"
,
.
bytes
=
TSDB_CGROUP_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"vgroup_id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"consumer_id"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
},
};
...
...
source/dnode/mnode/impl/inc/mndConsumer.h
浏览文件 @
e05ef9a8
...
...
@@ -29,6 +29,7 @@ enum {
MQ_CONSUMER_STATUS__LOST
,
MQ_CONSUMER_STATUS__LOST_IN_REB
,
MQ_CONSUMER_STATUS__LOST_REBD
,
MQ_CONSUMER_STATUS__REMOVED
,
};
int32_t
mndInitConsumer
(
SMnode
*
pMnode
);
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
e05ef9a8
...
...
@@ -486,6 +486,14 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
}
}
if
(
pConsumerOld
&&
taosArrayGetSize
(
pConsumerNew
->
rebNewTopics
)
==
0
&&
taosArrayGetSize
(
pConsumerNew
->
rebRemovedTopics
)
==
0
)
{
/*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/
/*pConsumerNew->updateType = */
/*}*/
goto
SUBSCRIBE_OVER
;
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_SUBSCRIBE
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
goto
SUBSCRIBE_OVER
;
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
goto
SUBSCRIBE_OVER
;
...
...
@@ -789,6 +797,10 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
while
(
numOfRows
<
rowsCapacity
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_CONSUMER
,
pShow
->
pIter
,
(
void
**
)
&
pConsumer
);
if
(
pShow
->
pIter
==
NULL
)
break
;
if
(
taosArrayGetSize
(
pConsumer
->
assignedTopics
)
==
0
)
{
sdbRelease
(
pSdb
,
pConsumer
);
continue
;
}
taosRLockLatch
(
&
pConsumer
->
lock
);
...
...
@@ -810,12 +822,12 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumer
->
consumerId
,
false
);
//
group id
char
groupId
[
TSDB_CGROUP_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
tstrncpy
(
varDataVal
(
groupId
),
pConsumer
->
cgroup
,
TSDB_CGROUP_LEN
);
varDataSetLen
(
groupId
,
strlen
(
varDataVal
(
groupId
)));
//
consumer group
char
cgroup
[
TSDB_CGROUP_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
tstrncpy
(
varDataVal
(
cgroup
),
pConsumer
->
cgroup
,
TSDB_CGROUP_LEN
);
varDataSetLen
(
cgroup
,
strlen
(
varDataVal
(
cgroup
)));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
groupId
,
false
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
cgroup
,
false
);
// app id
char
appId
[
TSDB_CGROUP_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
e05ef9a8
...
...
@@ -171,14 +171,21 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM
return
0
;
}
static
int32_t
mndSplitSubscribeKey
(
const
char
*
key
,
char
*
topic
,
char
*
cgroup
)
{
static
int32_t
mndSplitSubscribeKey
(
const
char
*
key
,
char
*
topic
,
char
*
cgroup
,
bool
fullName
)
{
int32_t
i
=
0
;
while
(
key
[
i
]
!=
TMQ_SEPARATOR
)
{
i
++
;
}
memcpy
(
cgroup
,
key
,
i
);
cgroup
[
i
]
=
0
;
strcpy
(
topic
,
&
key
[
i
+
1
]);
if
(
fullName
)
{
strcpy
(
topic
,
&
key
[
i
+
1
]);
}
else
{
while
(
key
[
i
]
!=
'.'
)
{
i
++
;
}
strcpy
(
topic
,
&
key
[
i
+
1
]);
}
return
0
;
}
...
...
@@ -426,7 +433,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
pConsumerNew
->
updateType
=
CONSUMER_UPDATE__ADD
;
char
*
topic
=
taosMemoryCalloc
(
1
,
TSDB_TOPIC_FNAME_LEN
);
char
cgroup
[
TSDB_CGROUP_LEN
];
mndSplitSubscribeKey
(
pOutput
->
pSub
->
key
,
topic
,
cgroup
);
mndSplitSubscribeKey
(
pOutput
->
pSub
->
key
,
topic
,
cgroup
,
true
);
taosArrayPush
(
pConsumerNew
->
rebNewTopics
,
&
topic
);
mndReleaseConsumer
(
pMnode
,
pConsumerOld
);
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
{
...
...
@@ -444,7 +451,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
pConsumerNew
->
updateType
=
CONSUMER_UPDATE__REMOVE
;
char
*
topic
=
taosMemoryCalloc
(
1
,
TSDB_TOPIC_FNAME_LEN
);
char
cgroup
[
TSDB_CGROUP_LEN
];
mndSplitSubscribeKey
(
pOutput
->
pSub
->
key
,
topic
,
cgroup
);
mndSplitSubscribeKey
(
pOutput
->
pSub
->
key
,
topic
,
cgroup
,
true
);
taosArrayPush
(
pConsumerNew
->
rebRemovedTopics
,
&
topic
);
mndReleaseConsumer
(
pMnode
,
pConsumerOld
);
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
{
...
...
@@ -494,7 +501,7 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
// split sub key and extract topic
char
topic
[
TSDB_TOPIC_FNAME_LEN
];
char
cgroup
[
TSDB_CGROUP_LEN
];
mndSplitSubscribeKey
(
pRebInfo
->
key
,
topic
,
cgroup
);
mndSplitSubscribeKey
(
pRebInfo
->
key
,
topic
,
cgroup
,
true
);
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
topic
);
ASSERT
(
pTopic
);
taosRLockLatch
(
&
pTopic
->
lock
);
...
...
@@ -747,7 +754,7 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
// topic and cgroup
char
topic
[
TSDB_TOPIC_FNAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
char
cgroup
[
TSDB_CGROUP_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
mndSplitSubscribeKey
(
pSub
->
key
,
topic
,
cgroup
);
mndSplitSubscribeKey
(
pSub
->
key
,
varDataVal
(
topic
),
varDataVal
(
cgroup
),
false
);
varDataSetLen
(
topic
,
strlen
(
varDataVal
(
topic
)));
varDataSetLen
(
cgroup
,
strlen
(
varDataVal
(
cgroup
)));
...
...
@@ -790,7 +797,7 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
// topic and cgroup
char
topic
[
TSDB_TOPIC_FNAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
char
cgroup
[
TSDB_CGROUP_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
mndSplitSubscribeKey
(
pSub
->
key
,
topic
,
cgroup
);
mndSplitSubscribeKey
(
pSub
->
key
,
varDataVal
(
topic
),
varDataVal
(
cgroup
),
false
);
varDataSetLen
(
topic
,
strlen
(
varDataVal
(
topic
)));
varDataSetLen
(
cgroup
,
strlen
(
varDataVal
(
cgroup
)));
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
e05ef9a8
...
...
@@ -682,7 +682,11 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pVgroup
->
vnodeGid
[
i
].
dnodeId
,
false
);
char
buf1
[
20
]
=
{
0
};
const
char
*
role
=
syncStr
(
pVgroup
->
vnodeGid
[
i
].
role
);
SDnodeObj
*
pDnodeObj
=
mndAcquireDnode
(
pMnode
,
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
ASSERT
(
pDnodeObj
!=
NULL
);
bool
isOffLine
=
!
mndIsDnodeOnline
(
pMnode
,
pDnodeObj
,
taosGetTimestampMs
());
const
char
*
role
=
isOffLine
?
"OFFLINE"
:
syncStr
(
pVgroup
->
vnodeGid
[
i
].
role
);
STR_WITH_MAXSIZE_TO_VARSTR
(
buf1
,
role
,
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
e05ef9a8
...
...
@@ -70,6 +70,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static
void
tsdbResetCommitFile
(
SCommitH
*
pCommith
);
static
int
tsdbSetAndOpenCommitFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
);
static
int
tsdbCommitToTable
(
SCommitH
*
pCommith
,
int
tid
);
static
int
tsdbMoveBlkIdx
(
SCommitH
*
pCommith
,
SBlockIdx
*
pIdx
);
static
int
tsdbSetCommitTable
(
SCommitH
*
pCommith
,
STable
*
pTable
);
static
int
tsdbComparKeyBlock
(
const
void
*
arg1
,
const
void
*
arg2
);
static
int
tsdbWriteBlockInfo
(
SCommitH
*
pCommih
);
...
...
@@ -349,7 +350,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
if
(
tsdbSetAndOpenCommitFile
(
pCommith
,
pSet
,
fid
)
<
0
)
{
return
-
1
;
}
#if 0
// Loop to commit each table data
for (int tid = 0; tid < pCommith->niters; tid++) {
SCommitIter *pIter = pCommith->iters + tid;
...
...
@@ -363,6 +364,46 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
return -1;
}
}
#endif
// Loop to commit each table data in mem and file
int
mIter
=
0
,
fIter
=
0
;
int
nBlkIdx
=
taosArrayGetSize
(
pCommith
->
readh
.
aBlkIdx
);
while
(
true
)
{
SBlockIdx
*
pIdx
=
NULL
;
SCommitIter
*
pIter
=
NULL
;
if
(
mIter
<
pCommith
->
niters
)
{
pIter
=
pCommith
->
iters
+
mIter
;
if
(
fIter
<
nBlkIdx
)
{
pIdx
=
taosArrayGet
(
pCommith
->
readh
.
aBlkIdx
,
fIter
);
}
}
else
if
(
fIter
<
nBlkIdx
)
{
pIdx
=
taosArrayGet
(
pCommith
->
readh
.
aBlkIdx
,
fIter
);
}
else
{
break
;
}
if
(
pIter
&&
pIter
->
pTable
&&
(
!
pIdx
||
(
pIter
->
pTable
->
uid
<=
pIdx
->
uid
)))
{
if
(
tsdbCommitToTable
(
pCommith
,
mIter
)
<
0
)
{
tsdbCloseCommitFile
(
pCommith
,
true
);
// revert the file change
tsdbApplyDFileSetChange
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
),
pSet
);
return
-
1
;
}
if
(
pIdx
&&
(
pIter
->
pTable
->
uid
==
pIdx
->
uid
))
{
++
fIter
;
}
++
mIter
;
}
else
if
(
pIdx
)
{
if
(
tsdbMoveBlkIdx
(
pCommith
,
pIdx
)
<
0
)
{
tsdbCloseCommitFile
(
pCommith
,
true
);
// revert the file change
tsdbApplyDFileSetChange
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
),
pSet
);
return
-
1
;
}
++
fIter
;
}
}
if
(
tsdbWriteBlockIdx
(
TSDB_COMMIT_HEAD_FILE
(
pCommith
),
pCommith
->
aBlkIdx
,
(
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommith
))))
<
0
)
{
...
...
@@ -838,6 +879,40 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
return
0
;
}
static
int
tsdbMoveBlkIdx
(
SCommitH
*
pCommith
,
SBlockIdx
*
pIdx
)
{
SReadH
*
pReadh
=
&
pCommith
->
readh
;
int
nBlocks
=
pIdx
->
numOfBlocks
;
int
bidx
=
0
;
tsdbResetCommitTable
(
pCommith
);
pReadh
->
pBlkIdx
=
pIdx
;
if
(
tsdbLoadBlockInfo
(
pReadh
,
NULL
)
<
0
)
{
return
-
1
;
}
while
(
bidx
<
nBlocks
)
{
if
(
tsdbMoveBlock
(
pCommith
,
bidx
)
<
0
)
{
tsdbError
(
"vgId:%d failed to move block into file %s since %s"
,
TSDB_COMMIT_REPO_ID
(
pCommith
),
TSDB_FILE_FULL_NAME
(
TSDB_COMMIT_HEAD_FILE
(
pCommith
)),
tstrerror
(
terrno
));
return
-
1
;
}
++
bidx
;
}
STable
table
=
{.
tid
=
pIdx
->
uid
,
.
uid
=
pIdx
->
uid
,
.
pSchema
=
NULL
};
TSDB_COMMIT_TABLE
(
pCommith
)
=
&
table
;
if
(
tsdbWriteBlockInfo
(
pCommith
)
<
0
)
{
tsdbError
(
"vgId:%d failed to write SBlockInfo part into file %s since %s"
,
TSDB_COMMIT_REPO_ID
(
pCommith
),
TSDB_FILE_FULL_NAME
(
TSDB_COMMIT_HEAD_FILE
(
pCommith
)),
tstrerror
(
terrno
));
return
-
1
;
}
return
0
;
}
static
int
tsdbSetCommitTable
(
SCommitH
*
pCommith
,
STable
*
pTable
)
{
STSchema
*
pSchema
=
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
-
1
);
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
e05ef9a8
...
...
@@ -372,13 +372,13 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle,
}
if
(
level
==
TSDB_RETENTION_L0
)
{
tsdbDebug
(
"%p rsma level %d is selected to query
\n
"
,
pReadHandle
,
level
);
tsdbDebug
(
"%p rsma level %d is selected to query
"
,
pReadHandle
,
TSDB_RETENTION_L0
);
return
VND_RSMA0
(
pVnode
);
}
else
if
(
level
==
TSDB_RETENTION_L1
)
{
tsdbDebug
(
"%p rsma level %d is selected to query
\n
"
,
pReadHandle
,
level
);
tsdbDebug
(
"%p rsma level %d is selected to query
"
,
pReadHandle
,
TSDB_RETENTION_L1
);
return
VND_RSMA1
(
pVnode
);
}
else
{
tsdbDebug
(
"%p rsma level %d is selected to query
\n
"
,
pReadHandle
,
level
);
tsdbDebug
(
"%p rsma level %d is selected to query
"
,
pReadHandle
,
TSDB_RETENTION_L2
);
return
VND_RSMA2
(
pVnode
);
}
}
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
e05ef9a8
...
...
@@ -1943,7 +1943,6 @@ static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid
int32_t
tsdbUpdateTbUidList
(
STsdb
*
pTsdb
,
STbUidStore
*
pStore
)
{
if
(
!
pStore
||
(
taosArrayGetSize
(
pStore
->
tbUids
)
==
0
))
{
tsdbDebug
(
"vgId:%d no need to update tbUids since empty uidStore"
,
REPO_ID
(
pTsdb
));
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/executor/test/executorTests.cpp
浏览文件 @
e05ef9a8
...
...
@@ -23,34 +23,34 @@
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
#include "
tglobal
.h"
#include "
executor
.h"
#include "executorimpl.h"
#include "function.h"
#include "stub.h"
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "tdatablock.h"
#include "trpc.h"
#include "stub.h"
#include "executor.h"
#include "tdef.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tname.h"
#include "trpc.h"
#include "tvariant.h"
namespace
{
enum
{
data_rand
=
0x1
,
data_asc
=
0x2
,
data_asc
=
0x2
,
data_desc
=
0x3
,
};
typedef
struct
SDummyInputInfo
{
int32_t
totalPages
;
// numOfPages
int32_t
totalPages
;
// numOfPages
int32_t
current
;
int32_t
startVal
;
int32_t
type
;
int32_t
numOfRowsPerPage
;
int32_t
numOfCols
;
// number of columns
int32_t
numOfCols
;
// number of columns
int64_t
tsStart
;
SSDataBlock
*
pBlock
;
}
SDummyInputInfo
;
...
...
@@ -75,26 +75,26 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) {
taosArrayPush
(
pInfo
->
pBlock
->
pDataBlock
,
&
colInfo
);
// SColumnInfoData colInfo1 = {0};
// colInfo1.info.type = TSDB_DATA_TYPE_BINARY;
// colInfo1.info.bytes = 40;
// colInfo1.info.colId = 2;
//
// colInfo1.varmeta.allocLen = 0;//numOfRows * sizeof(int32_t);
// colInfo1.varmeta.length = 0;
// colInfo1.varmeta.offset = static_cast<int32_t*>(taosMemoryCalloc(1, numOfRows * sizeof(int32_t)));
//
// taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
// SColumnInfoData colInfo1 = {0};
// colInfo1.info.type = TSDB_DATA_TYPE_BINARY;
// colInfo1.info.bytes = 40;
// colInfo1.info.colId = 2;
//
// colInfo1.varmeta.allocLen = 0;//numOfRows * sizeof(int32_t);
// colInfo1.varmeta.length = 0;
// colInfo1.varmeta.offset = static_cast<int32_t*>(taosMemoryCalloc(1, numOfRows * sizeof(int32_t)));
//
// taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
}
else
{
blockDataCleanup
(
pInfo
->
pBlock
);
}
SSDataBlock
*
pBlock
=
pInfo
->
pBlock
;
char
buf
[
128
]
=
{
0
};
char
b1
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
char
b1
[
128
]
=
{
0
};
int32_t
v
=
0
;
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfRowsPerPage
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfRowsPerPage
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
static_cast
<
SColumnInfoData
*>
(
TARRAY_GET_ELEM
(
pBlock
->
pDataBlock
,
0
));
if
(
pInfo
->
type
==
data_desc
)
{
...
...
@@ -107,11 +107,11 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) {
colDataAppend
(
pColInfo
,
i
,
reinterpret_cast
<
const
char
*>
(
&
v
),
false
);
// sprintf(buf, "this is %d row", i);
// STR_TO_VARSTR(b1, buf);
//
// SColumnInfoData* pColInfo2 = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 1));
// colDataAppend(pColInfo2, i, b1, false);
// sprintf(buf, "this is %d row", i);
// STR_TO_VARSTR(b1, buf);
//
// SColumnInfoData* pColInfo2 = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 1));
// colDataAppend(pColInfo2, i, b1, false);
}
pBlock
->
info
.
rows
=
pInfo
->
numOfRowsPerPage
;
...
...
@@ -137,7 +137,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
colInfo
.
info
.
bytes
=
sizeof
(
int64_t
);
colInfo
.
info
.
colId
=
1
;
colInfo
.
pData
=
static_cast
<
char
*>
(
taosMemoryCalloc
(
pInfo
->
numOfRowsPerPage
,
sizeof
(
int64_t
)));
// colInfo.nullbitmap = static_cast<char*>(taosMemoryCalloc(1, (pInfo->numOfRowsPerPage + 7) / 8));
// colInfo.nullbitmap = static_cast<char*>(taosMemoryCalloc(1, (pInfo->numOfRowsPerPage + 7) / 8));
taosArrayPush
(
pInfo
->
pBlock
->
pDataBlock
,
&
colInfo
);
...
...
@@ -156,11 +156,11 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
SSDataBlock
*
pBlock
=
pInfo
->
pBlock
;
char
buf
[
128
]
=
{
0
};
char
b1
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
char
b1
[
128
]
=
{
0
};
int64_t
ts
=
0
;
int32_t
v
=
0
;
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfRowsPerPage
;
++
i
)
{
int32_t
v
=
0
;
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfRowsPerPage
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
static_cast
<
SColumnInfoData
*>
(
TARRAY_GET_ELEM
(
pBlock
->
pDataBlock
,
0
));
ts
=
(
++
pInfo
->
tsStart
);
...
...
@@ -177,11 +177,11 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
colDataAppend
(
pColInfo1
,
i
,
reinterpret_cast
<
const
char
*>
(
&
v
),
false
);
// sprintf(buf, "this is %d row", i);
// STR_TO_VARSTR(b1, buf);
//
// SColumnInfoData* pColInfo2 = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 1));
// colDataAppend(pColInfo2, i, b1, false);
// sprintf(buf, "this is %d row", i);
// STR_TO_VARSTR(b1, buf);
//
// SColumnInfoData* pColInfo2 = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 1));
// colDataAppend(pColInfo2, i, b1, false);
}
pBlock
->
info
.
rows
=
pInfo
->
numOfRowsPerPage
;
...
...
@@ -191,10 +191,10 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
blockDataUpdateTsWindow
(
pBlock
);
return
pBlock
;
}
SOperatorInfo
*
createDummyOperator
(
int32_t
startVal
,
int32_t
numOfBlocks
,
int32_t
rowsPerPage
,
int32_t
type
,
int32_t
numOfCols
)
{
SOperatorInfo
*
createDummyOperator
(
int32_t
startVal
,
int32_t
numOfBlocks
,
int32_t
rowsPerPage
,
int32_t
type
,
int32_t
numOfCols
)
{
SOperatorInfo
*
pOperator
=
static_cast
<
SOperatorInfo
*>
(
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
)));
pOperator
->
name
=
"dummyInputOpertor4Test"
;
...
...
@@ -204,24 +204,25 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_
pOperator
->
fpSet
.
getNextFn
=
get2ColsDummyBlock
;
}
SDummyInputInfo
*
pInfo
=
(
SDummyInputInfo
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SDummyInputInfo
));
SDummyInputInfo
*
pInfo
=
(
SDummyInputInfo
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SDummyInputInfo
));
pInfo
->
totalPages
=
numOfBlocks
;
pInfo
->
startVal
=
startVal
;
pInfo
->
startVal
=
startVal
;
pInfo
->
numOfRowsPerPage
=
rowsPerPage
;
pInfo
->
type
=
type
;
pInfo
->
tsStart
=
1620000000000
;
pInfo
->
type
=
type
;
pInfo
->
tsStart
=
1620000000000
;
pOperator
->
info
=
pInfo
;
return
pOperator
;
}
}
}
// namespace
int
main
(
int
argc
,
char
**
argv
)
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
TEST
(
testCase
,
build_executor_tree_Test
)
{
const
char
*
msg
=
"{
\n
"
const
char
*
msg
=
"{
\n
"
"
\"
NodeType
\"
:
\"
48
\"
,
\n
"
"
\"
Name
\"
:
\"
PhysiSubplan
\"
,
\n
"
"
\"
PhysiSubplan
\"
: {
\n
"
...
...
@@ -938,16 +939,19 @@ TEST(testCase, build_executor_tree_Test) {
SExecTaskInfo
*
pTaskInfo
=
nullptr
;
DataSinkHandle
sinkHandle
=
nullptr
;
SReadHandle
handle
=
{
reinterpret_cast
<
void
*>
(
0x1
),
reinterpret_cast
<
void
*>
(
0x1
),
NULL
};
SReadHandle
handle
=
{
reinterpret_cast
<
void
*>
(
0x1
),
reinterpret_cast
<
void
*>
(
0x1
),
NULL
};
struct
SSubplan
*
plan
=
NULL
;
int32_t
code
=
qStringToSubplan
(
msg
,
&
plan
);
struct
SSubplan
*
plan
=
NULL
;
int32_t
code
=
qStringToSubplan
(
msg
,
&
plan
);
ASSERT_EQ
(
code
,
0
);
code
=
qCreateExecTask
(
&
handle
,
2
,
1
,
plan
,
(
void
**
)
&
pTaskInfo
,
&
sinkHandle
,
OPTR_EXEC_MODEL_BATCH
);
code
=
qCreateExecTask
(
&
handle
,
2
,
1
,
plan
,
(
void
**
)
&
pTaskInfo
,
&
sinkHandle
,
OPTR_EXEC_MODEL_BATCH
);
ASSERT_EQ
(
code
,
0
);
}
TEST
(
testCase
,
index_plan_test
)
{
// add later
EXPECT_EQ
(
0
,
0
);
}
#if 0
TEST(testCase, inMem_sort_Test) {
...
...
@@ -983,19 +987,19 @@ TEST(testCase, inMem_sort_Test) {
typedef
struct
su
{
int32_t
v
;
char
*
c
;
char
*
c
;
}
su
;
int32_t
cmp
(
const
void
*
p1
,
const
void
*
p2
)
{
su
*
v1
=
(
su
*
)
p1
;
su
*
v2
=
(
su
*
)
p2
;
su
*
v1
=
(
su
*
)
p1
;
su
*
v2
=
(
su
*
)
p2
;
int32_t
x1
=
*
(
int32_t
*
)
v1
->
c
;
int32_t
x2
=
*
(
int32_t
*
)
v2
->
c
;
int32_t
x1
=
*
(
int32_t
*
)
v1
->
c
;
int32_t
x2
=
*
(
int32_t
*
)
v2
->
c
;
if
(
x1
==
x2
)
{
return
0
;
}
else
{
return
x1
<
x2
?
-
1
:
1
;
return
x1
<
x2
?
-
1
:
1
;
}
}
...
...
@@ -1228,4 +1232,4 @@ TEST(testCase, time_interval_Operator_Test) {
}
#endif
#pragma GCC diagnosti
\ No newline at end of file
#pragma GCC diagnosti
source/libs/index/src/indexCache.c
浏览文件 @
e05ef9a8
...
...
@@ -278,9 +278,9 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
break
;
}
CacheTerm
*
c
=
(
CacheTerm
*
)
SL_GET_NODE_DATA
(
node
);
printf
(
"json val: %s
\n
"
,
c
->
colVal
);
if
(
0
!=
strncmp
(
c
->
colVal
,
term
->
colName
,
term
->
nColName
))
{
continue
;
//
printf("json val: %s\n", c->colVal);
if
(
0
!=
strncmp
(
c
->
colVal
,
pCt
->
colVal
,
skip
))
{
break
;
}
TExeCond
cond
=
cmpFn
(
c
->
colVal
+
skip
,
term
->
colVal
,
dType
);
...
...
@@ -640,30 +640,30 @@ static int indexFindCh(char* a, char c) {
return
p
-
a
;
}
static
int
indexCacheJsonTermCompareImpl
(
char
*
a
,
char
*
b
)
{
int
alen
=
indexFindCh
(
a
,
'&'
);
int
blen
=
indexFindCh
(
b
,
'&'
);
int
cmp
=
strncmp
(
a
,
b
,
MIN
(
alen
,
blen
));
if
(
cmp
==
0
)
{
cmp
=
alen
-
blen
;
if
(
cmp
!=
0
)
{
return
cmp
;
}
cmp
=
*
(
a
+
alen
)
-
*
(
b
+
blen
);
if
(
cmp
!=
0
)
{
return
cmp
;
}
alen
+=
2
;
blen
+=
2
;
cmp
=
strcmp
(
a
+
alen
,
b
+
blen
);
}
return
cmp
;
//
int alen = indexFindCh(a, '&');
//
int blen = indexFindCh(b, '&');
//
int cmp = strncmp(a, b, MIN(alen, blen));
//
if (cmp == 0) {
//
cmp = alen - blen;
//
if (cmp != 0) {
//
return cmp;
//
}
//
cmp = *(a + alen) - *(b + blen);
//
if (cmp != 0) {
//
return cmp;
//
}
//
alen += 2;
//
blen += 2;
//
cmp = strcmp(a + alen, b + blen);
//
}
return
0
;
}
static
int32_t
indexCacheJsonTermCompare
(
const
void
*
l
,
const
void
*
r
)
{
CacheTerm
*
lt
=
(
CacheTerm
*
)
l
;
CacheTerm
*
rt
=
(
CacheTerm
*
)
r
;
// compare colVal
int
cmp
=
indexCacheJsonTermCompareImpl
(
lt
->
colVal
,
rt
->
colVal
);
int
32_t
cmp
=
strcmp
(
lt
->
colVal
,
rt
->
colVal
);
if
(
cmp
==
0
)
{
return
rt
->
version
-
lt
->
version
;
}
...
...
@@ -704,6 +704,8 @@ static bool indexCacheIteratorNext(Iterate* itera) {
iv
->
type
=
ct
->
operaType
;
iv
->
ver
=
ct
->
version
;
iv
->
colVal
=
tstrdup
(
ct
->
colVal
);
// printf("col Val: %s\n", iv->colVal);
// iv->colType = cv->colType;
taosArrayPush
(
iv
->
val
,
&
ct
->
uid
);
}
...
...
source/libs/index/src/indexTfile.c
浏览文件 @
e05ef9a8
...
...
@@ -334,7 +334,12 @@ static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult
while
((
rt
=
streamWithStateNextWith
(
st
,
NULL
))
!=
NULL
)
{
FstSlice
*
s
=
&
rt
->
data
;
char
*
ch
=
(
char
*
)
fstSliceData
(
s
,
NULL
);
TExeCond
cond
=
cmpFn
(
ch
,
p
,
tem
->
colType
);
// if (0 != strncmp(ch, tem->colName, tem->nColName)) {
// swsResultDestroy(rt);
// break;
//}
TExeCond
cond
=
cmpFn
(
ch
,
p
,
tem
->
colType
);
if
(
MATCH
==
cond
)
{
tfileReaderLoadTableIds
((
TFileReader
*
)
reader
,
rt
->
out
.
out
,
tr
->
total
);
}
else
if
(
CONTINUE
==
cond
)
{
...
...
@@ -455,16 +460,22 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
AutomationCtx
*
ctx
=
automCtxCreate
((
void
*
)
p
,
AUTOMATION_PREFIX
);
FstStreamBuilder
*
sb
=
fstSearch
(((
TFileReader
*
)
reader
)
->
fst
,
ctx
);
FstSlice
h
=
fstSliceCreate
((
uint8_t
*
)
p
,
skip
);
fstStreamBuilderSetRange
(
sb
,
&
h
,
ctype
);
fstSliceDestroy
(
&
h
);
//
FstSlice h = fstSliceCreate((uint8_t*)p, skip);
//
fstStreamBuilderSetRange(sb, &h, ctype);
//
fstSliceDestroy(&h);
StreamWithState
*
st
=
streamBuilderIntoStream
(
sb
);
StreamWithStateResult
*
rt
=
NULL
;
while
((
rt
=
streamWithStateNextWith
(
st
,
NULL
))
!=
NULL
)
{
FstSlice
*
s
=
&
rt
->
data
;
char
*
ch
=
(
char
*
)
fstSliceData
(
s
,
NULL
);
TExeCond
cond
=
cmpFn
(
ch
,
p
,
tem
->
colType
);
char
*
ch
=
(
char
*
)
fstSliceData
(
s
,
NULL
);
if
(
0
!=
strncmp
(
ch
,
p
,
skip
))
{
swsResultDestroy
(
rt
);
break
;
}
TExeCond
cond
=
cmpFn
(
ch
+
skip
,
tem
->
colVal
,
tem
->
colType
);
if
(
MATCH
==
cond
)
{
tfileReaderLoadTableIds
((
TFileReader
*
)
reader
,
rt
->
out
.
out
,
tr
->
total
);
}
else
if
(
CONTINUE
==
cond
)
{
...
...
@@ -594,13 +605,16 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
if
(
tfileWriteData
(
tw
,
v
)
!=
0
)
{
indexError
(
"failed to write data: %s, offset: %d len: %d"
,
v
->
colVal
,
v
->
offset
,
(
int
)
taosArrayGetSize
(
v
->
tableId
));
// printf("write faile\n");
}
else
{
// printf("write sucee\n");
// indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
// (int)taosArrayGetSize(v->tableId));
// indexInfo("tfile write data size: %d", tw->ctx->size(tw->ctx));
}
}
fstBuilderFinish
(
tw
->
fb
);
fstBuilderDestroy
(
tw
->
fb
);
tw
->
fb
=
NULL
;
...
...
@@ -845,18 +859,24 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
uint8_t
colType
=
header
->
colType
;
colType
=
INDEX_TYPE_GET_TYPE
(
colType
);
if
(
colType
==
TSDB_DATA_TYPE_BINARY
||
colType
==
TSDB_DATA_TYPE_NCHAR
)
{
FstSlice
key
=
fstSliceCreate
((
uint8_t
*
)(
tval
->
colVal
),
(
size_t
)
strlen
(
tval
->
colVal
));
if
(
fstBuilderInsert
(
write
->
fb
,
key
,
tval
->
offset
))
{
fstSliceDestroy
(
&
key
);
return
0
;
}
FstSlice
key
=
fstSliceCreate
((
uint8_t
*
)(
tval
->
colVal
),
(
size_t
)
strlen
(
tval
->
colVal
));
if
(
fstBuilderInsert
(
write
->
fb
,
key
,
tval
->
offset
))
{
fstSliceDestroy
(
&
key
);
return
-
1
;
}
else
{
// handle other type later
return
0
;
}
return
0
;
return
-
1
;
// if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
// FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
// if (fstBuilderInsert(write->fb, key, tval->offset)) {
// fstSliceDestroy(&key);
// return 0;
// }
// fstSliceDestroy(&key);
// return -1;
//} else {
// // handle other type later
//}
}
static
int
tfileWriteFooter
(
TFileWriter
*
write
)
{
char
buf
[
sizeof
(
tfileMagicNumber
)
+
1
]
=
{
0
};
...
...
@@ -913,8 +933,9 @@ static int tfileReaderLoadFst(TFileReader* reader) {
static
int
tfileReaderLoadTableIds
(
TFileReader
*
reader
,
int32_t
offset
,
SArray
*
result
)
{
// TODO(yihao): opt later
WriterCtx
*
ctx
=
reader
->
ctx
;
char
block
[
1024
]
=
{
0
};
int32_t
nread
=
ctx
->
readFrom
(
ctx
,
block
,
sizeof
(
block
),
offset
);
// add block cache
char
block
[
1024
]
=
{
0
};
int32_t
nread
=
ctx
->
readFrom
(
ctx
,
block
,
sizeof
(
block
),
offset
);
assert
(
nread
>=
sizeof
(
uint32_t
));
char
*
p
=
block
;
...
...
source/libs/index/test/fstTest.cc
浏览文件 @
e05ef9a8
...
...
@@ -272,9 +272,8 @@ void checkFstCheckIterator1() {
std
::
cout
<<
"insert data count : "
<<
count
<<
"elapas time: "
<<
e
-
s
<<
std
::
endl
;
fw
->
Put
(
"Hello world"
,
1
);
fw
->
Put
(
"Hello worle"
,
2
);
fw
->
Put
(
"hello worlf"
,
4
);
fw
->
Put
(
"test1&^D&10"
,
1
);
fw
->
Put
(
"test2&^D&10"
,
2
);
delete
fw
;
FstReadMemory
*
m
=
new
FstReadMemory
(
1024
*
64
);
...
...
@@ -645,11 +644,11 @@ int main(int argc, char* argv[]) {
// iterTFileReader(argv[1], argv[2], argv[3], argv[4]);
//}
checkFstCheckIterator1
();
checkFstCheckIterator2
();
checkFstCheckIteratorPrefix
();
checkFstCheckIteratorRange1
();
checkFstCheckIteratorRange2
();
checkFstCheckIteratorRange3
();
//
checkFstCheckIterator2();
//
checkFstCheckIteratorPrefix();
//
checkFstCheckIteratorRange1();
//
checkFstCheckIteratorRange2();
//
checkFstCheckIteratorRange3();
// checkFstLongTerm();
// checkFstPrefixSearch();
...
...
source/libs/index/test/jsonUT.cc
浏览文件 @
e05ef9a8
...
...
@@ -181,3 +181,240 @@ TEST_F(JsonEnv, testWriteMillonData) {
}
}
}
TEST_F
(
JsonEnv
,
testWriteJsonNumberData
)
{
{
std
::
string
colName
(
"test"
);
std
::
string
colVal
(
"10"
);
SIndexTerm
*
term
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
1000
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
}
{
std
::
string
colName
(
"test2"
);
std
::
string
colVal
(
"20"
);
SIndexTerm
*
term
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
1000
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
}
{
std
::
string
colName
(
"test2"
);
std
::
string
colVal
(
"15"
);
SIndexTerm
*
term
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
1000
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
}
{
std
::
string
colName
(
"test2"
);
std
::
string
colVal
(
"15"
);
SIndexTerm
*
term
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
1000
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
}
{
std
::
string
colName
(
"test"
);
std
::
string
colVal
(
"10"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_TERM
);
tIndexJsonSearch
(
index
,
mq
,
result
);
EXPECT_EQ
(
1000
,
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
{
std
::
string
colName
(
"test"
);
std
::
string
colVal
(
"10"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_THAN
);
tIndexJsonSearch
(
index
,
mq
,
result
);
EXPECT_EQ
(
0
,
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
{
std
::
string
colName
(
"test"
);
std
::
string
colVal
(
"10"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_EQUAL
);
tIndexJsonSearch
(
index
,
mq
,
result
);
EXPECT_EQ
(
1000
,
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
{
std
::
string
colName
(
"test"
);
std
::
string
colVal
(
"10"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_LESS_THAN
);
tIndexJsonSearch
(
index
,
mq
,
result
);
EXPECT_EQ
(
0
,
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
{
std
::
string
colName
(
"test"
);
std
::
string
colVal
(
"10"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_LESS_EQUAL
);
tIndexJsonSearch
(
index
,
mq
,
result
);
EXPECT_EQ
(
1000
,
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
}
TEST_F
(
JsonEnv
,
testWriteJsonTfileAndCache
)
{
{
std
::
string
colName
(
"test1"
);
std
::
string
colVal
(
"10"
);
SIndexTerm
*
term
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
1000
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
}
{
std
::
string
colName
(
"test"
);
std
::
string
colVal
(
"xxxxxxxxxxxxxxxxxxx"
);
SIndexTerm
*
term
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
100000
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
}
{
std
::
string
colName
(
"test1"
);
std
::
string
colVal
(
"10"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_TERM
);
tIndexJsonSearch
(
index
,
mq
,
result
);
EXPECT_EQ
(
1000
,
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
{
std
::
string
colName
(
"test1"
);
std
::
string
colVal
(
"10"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_THAN
);
tIndexJsonSearch
(
index
,
mq
,
result
);
EXPECT_EQ
(
0
,
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
{
std
::
string
colName
(
"test1"
);
std
::
string
colVal
(
"10"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_EQUAL
);
tIndexJsonSearch
(
index
,
mq
,
result
);
EXPECT_EQ
(
1000
,
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
{
std
::
string
colName
(
"test1"
);
std
::
string
colVal
(
"10"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_THAN
);
tIndexJsonSearch
(
index
,
mq
,
result
);
EXPECT_EQ
(
0
,
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
{
std
::
string
colName
(
"test1"
);
std
::
string
colVal
(
"10"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_LESS_EQUAL
);
tIndexJsonSearch
(
index
,
mq
,
result
);
EXPECT_EQ
(
1000
,
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
{
std
::
string
colName
(
"test1"
);
std
::
string
colVal
(
"10"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_LESS_THAN
);
tIndexJsonSearch
(
index
,
mq
,
result
);
EXPECT_EQ
(
0
,
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
}
source/libs/transport/inc/transComm.h
浏览文件 @
e05ef9a8
...
...
@@ -105,8 +105,8 @@ typedef void* queue[2];
/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT
1
0 // retry count limit
#define TRANS_RETRY_INTERVAL
5
// ms retry interval
#define TRANS_RETRY_COUNT_LIMIT
2
0 // retry count limit
#define TRANS_RETRY_INTERVAL
15
// ms retry interval
#define TRANS_CONN_TIMEOUT 3 // connect timeout
typedef
struct
{
...
...
source/os/CMakeLists.txt
浏览文件 @
e05ef9a8
...
...
@@ -17,15 +17,25 @@ endif ()
if
(
USE_TD_MEMORY
)
add_definitions
(
-DUSE_TD_MEMORY
)
endif
()
if
(
BUILD_ADDR2LINE
)
target_include_directories
(
os
PUBLIC
"
${
TD_SOURCE_DIR
}
/contrib/libdwarf/src/lib/libdwarf"
)
add_definitions
(
-DUSE_ADDR2LINE
)
target_link_libraries
(
os PUBLIC addr2line dl z
)
endif
()
target_link_libraries
(
os pthread
os
PUBLIC
pthread
)
if
(
NOT TD_WINDOWS
)
target_link_libraries
(
os dl m rt
os
PUBLIC
dl m rt
)
else
()
target_link_libraries
(
os ws2_32 iconv msvcregex wcwidth winmm
os
PUBLIC
ws2_32 iconv msvcregex wcwidth winmm
)
endif
(
NOT TD_WINDOWS
)
source/os/src/osMemory.c
浏览文件 @
e05ef9a8
...
...
@@ -17,7 +17,7 @@
#include <malloc.h>
#include "os.h"
#if
def USE_TD_MEMORY
#if
defined(USE_TD_MEMORY) || defined(USE_ADDR2LINE)
#define TD_MEMORY_SYMBOL ('T' << 24 | 'A' << 16 | 'O' << 8 | 'S')
...
...
@@ -71,14 +71,112 @@ int32_t taosBackTrace(void **buffer, int32_t size) {
return
frame
;
}
#endif
// char **taosBackTraceSymbols(int32_t *size) {
// void *buffer[20] = {NULL};
// *size = taosBackTrace(buffer, 20);
// return backtrace_symbols(buffer, *size);
// }
#ifdef USE_ADDR2LINE
#include "osThread.h"
#include "libdwarf.h"
#include "dwarf.h"
#define DW_PR_DUu "llu"
typedef
struct
lookup_table
{
Dwarf_Line
*
table
;
Dwarf_Line_Context
*
ctxts
;
int
cnt
;
Dwarf_Addr
low
;
Dwarf_Addr
high
;
}
lookup_tableT
;
extern
int
create_lookup_table
(
Dwarf_Debug
dbg
,
lookup_tableT
*
lookup_table
);
extern
void
delete_lookup_table
(
lookup_tableT
*
lookup_table
);
size_t
addr
=
0
;
lookup_tableT
lookup_table
;
Dwarf_Debug
tDbg
;
static
TdThreadOnce
traceThreadInit
=
PTHREAD_ONCE_INIT
;
void
endTrace
()
{
if
(
traceThreadInit
!=
PTHREAD_ONCE_INIT
)
{
delete_lookup_table
(
&
lookup_table
);
dwarf_finish
(
tDbg
);
}
}
void
startTrace
()
{
int
ret
;
Dwarf_Ptr
errarg
=
0
;
FILE
*
fp
=
fopen
(
"/proc/self/maps"
,
"r"
);
fscanf
(
fp
,
"%lx-"
,
&
addr
);
fclose
(
fp
);
ret
=
dwarf_init_path
(
"/proc/self/exe"
,
NULL
,
0
,
DW_GROUPNUMBER_ANY
,
NULL
,
errarg
,
&
tDbg
,
NULL
);
if
(
ret
==
DW_DLV_NO_ENTRY
)
{
printf
(
"Unable to open file"
);
return
;
}
ret
=
create_lookup_table
(
tDbg
,
&
lookup_table
);
if
(
ret
!=
DW_DLV_OK
)
{
printf
(
"Unable to create lookup table"
);
return
;
}
atexit
(
endTrace
);
}
static
void
print_line
(
Dwarf_Debug
dbg
,
Dwarf_Line
line
,
Dwarf_Addr
pc
)
{
char
*
linesrc
=
"??"
;
Dwarf_Unsigned
lineno
=
0
;
if
(
line
)
{
dwarf_linesrc
(
line
,
&
linesrc
,
NULL
);
dwarf_lineno
(
line
,
&
lineno
,
NULL
);
}
printf
(
"%s:%"
DW_PR_DUu
"
\n
"
,
linesrc
,
lineno
);
if
(
line
)
dwarf_dealloc
(
dbg
,
linesrc
,
DW_DLA_STRING
);
}
void
taosPrintBackTrace
()
{
int
size
=
20
;
void
**
buffer
[
20
];
Dwarf_Addr
pc
;
int32_t
frame
=
0
;
void
**
ebp
;
void
**
ret
=
NULL
;
size_t
func_frame_distance
=
0
;
taosThreadOnce
(
&
traceThreadInit
,
startTrace
);
if
(
buffer
!=
NULL
&&
size
>
0
)
{
ebp
=
taosGetEbp
();
func_frame_distance
=
(
size_t
)
*
ebp
-
(
size_t
)
ebp
;
while
(
ebp
&&
frame
<
size
&&
(
func_frame_distance
<
(
1ULL
<<
24
))
&&
(
func_frame_distance
>
0
))
{
ret
=
ebp
+
1
;
buffer
[
frame
++
]
=
*
ret
;
ebp
=
(
void
**
)(
*
ebp
);
func_frame_distance
=
(
size_t
)
*
ebp
-
(
size_t
)
ebp
;
}
for
(
size_t
i
=
0
;
i
<
frame
;
i
++
)
{
pc
=
(
size_t
)
buffer
[
i
]
-
addr
;
if
(
pc
>
0
)
{
if
(
pc
>=
lookup_table
.
low
&&
pc
<
lookup_table
.
high
)
{
Dwarf_Line
line
=
lookup_table
.
table
[
pc
-
lookup_table
.
low
];
if
(
line
)
print_line
(
tDbg
,
line
,
pc
);
}
}
}
}
}
#endif
#endif
#endif
#ifndef USE_ADDR2LINE
void
taosPrintBackTrace
()
{
return
;
}
#endif
void
*
taosMemoryMalloc
(
int32_t
size
)
{
...
...
tests/test/c/tmqSim.c
浏览文件 @
e05ef9a8
...
...
@@ -331,12 +331,6 @@ void loop_consume(SThreadInfo* pInfo) {
}
}
err
=
tmq_consumer_close
(
pInfo
->
tmq
);
if
(
err
)
{
printf
(
"tmq_consumer_close() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
}
pInfo
->
consumeMsgCnt
=
totalMsgs
;
pInfo
->
consumeRowCnt
=
totalRows
;
...
...
@@ -372,6 +366,13 @@ void* consumeThreadFunc(void* param) {
return
NULL
;
}
err
=
tmq_consumer_close
(
pInfo
->
tmq
);
if
(
err
)
{
printf
(
"tmq_consumer_close() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
}
pInfo
->
tmq
=
NULL
;
// save consume result into consumeresult table
saveConsumeResult
(
pInfo
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录