Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4ed37aee
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4ed37aee
编写于
1月 27, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/mnode
上级
3152824a
5fcfebe8
变更
29
隐藏空白更改
内联
并排
Showing
29 changed file
with
807 addition
and
236 deletion
+807
-236
cmake/cmake.options
cmake/cmake.options
+6
-0
include/client/taos.h
include/client/taos.h
+18
-17
include/common/common.h
include/common/common.h
+88
-10
include/common/tmsg.h
include/common/tmsg.h
+49
-16
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+25
-7
source/client/src/clientMain.c
source/client/src/clientMain.c
+67
-0
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+3
-2
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+47
-47
source/common/src/tep.c
source/common/src/tep.c
+97
-0
source/dnode/mgmt/impl/src/dndMgmt.c
source/dnode/mgmt/impl/src/dndMgmt.c
+1
-1
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+1
-3
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+8
-5
source/dnode/vnode/inc/meta.h
source/dnode/vnode/inc/meta.h
+0
-5
source/dnode/vnode/inc/tq.h
source/dnode/vnode/inc/tq.h
+6
-5
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+11
-8
source/dnode/vnode/src/meta/metaBDBImpl.c
source/dnode/vnode/src/meta/metaBDBImpl.c
+65
-19
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+101
-53
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+4
-3
source/libs/planner/src/physicalPlanJson.c
source/libs/planner/src/physicalPlanJson.c
+0
-2
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+6
-4
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+0
-2
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+4
-6
source/libs/transport/CMakeLists.txt
source/libs/transport/CMakeLists.txt
+2
-2
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+50
-13
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+44
-6
source/libs/transport/test/CMakeLists.txt
source/libs/transport/test/CMakeLists.txt
+20
-0
source/libs/transport/test/transUT.cc
source/libs/transport/test/transUT.cc
+82
-0
tools/shell/src/shellEngine.c
tools/shell/src/shellEngine.c
+1
-0
未找到文件。
cmake/cmake.options
浏览文件 @
4ed37aee
...
...
@@ -50,6 +50,12 @@ option(
OFF
)
option(
BUILD_WITH_UV_TRANS
"If build with libuv_trans "
OFF
)
option(
BUILD_WITH_CRAFT
"If build with canonical-raft"
...
...
include/client/taos.h
浏览文件 @
4ed37aee
...
...
@@ -92,14 +92,16 @@ typedef struct taosField {
typedef
void
(
*
__taos_async_fn_t
)(
void
*
param
,
TAOS_RES
*
,
int
code
);
DLL_EXPORT
void
taos_cleanup
(
void
);
DLL_EXPORT
int
taos_options
(
TSDB_OPTION
option
,
const
void
*
arg
,
...);
DLL_EXPORT
TAOS
*
taos_connect
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
TAOS
*
taos_connect_l
(
const
char
*
ip
,
int
ipLen
,
const
char
*
user
,
int
userLen
,
const
char
*
pass
,
int
passLen
,
const
char
*
db
,
int
dbLen
,
uint16_t
port
);
DLL_EXPORT
TAOS
*
taos_connect_auth
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
void
taos_close
(
TAOS
*
taos
);
typedef
struct
tmq_t
tmq_t
;
typedef
struct
tmq_conf_t
tmq_conf_t
;
typedef
struct
tmq_list_t
tmq_list_t
;
const
char
*
taos_data_type
(
int
type
);
typedef
struct
tmq_message_t
tmq_message_t
;
typedef
struct
tmq_message_topic_t
tmq_message_topic_t
;
typedef
struct
tmq_message_tb_t
tmq_message_tb_t
;
typedef
struct
tmq_tb_iter_t
tmq_tb_iter_t
;
typedef
struct
tmq_message_col_t
tmq_message_col_t
;
typedef
struct
tmq_col_iter_t
tmq_col_iter_t
;
typedef
struct
TAOS_BIND
{
int
buffer_type
;
...
...
@@ -134,6 +136,15 @@ typedef struct TAOS_MULTI_BIND {
int
num
;
}
TAOS_MULTI_BIND
;
DLL_EXPORT
void
taos_cleanup
(
void
);
DLL_EXPORT
int
taos_options
(
TSDB_OPTION
option
,
const
void
*
arg
,
...);
DLL_EXPORT
TAOS
*
taos_connect
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
TAOS
*
taos_connect_l
(
const
char
*
ip
,
int
ipLen
,
const
char
*
user
,
int
userLen
,
const
char
*
pass
,
int
passLen
,
const
char
*
db
,
int
dbLen
,
uint16_t
port
);
DLL_EXPORT
TAOS
*
taos_connect_auth
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
void
taos_close
(
TAOS
*
taos
);
const
char
*
taos_data_type
(
int
type
);
DLL_EXPORT
TAOS_STMT
*
taos_stmt_init
(
TAOS
*
taos
);
DLL_EXPORT
int
taos_stmt_prepare
(
TAOS_STMT
*
stmt
,
const
char
*
sql
,
unsigned
long
length
);
DLL_EXPORT
int
taos_stmt_set_tbname_tags
(
TAOS_STMT
*
stmt
,
const
char
*
name
,
TAOS_BIND
*
tags
);
...
...
@@ -192,16 +203,6 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
DLL_EXPORT
int
taos_load_table_info
(
TAOS
*
taos
,
const
char
*
tableNameList
);
DLL_EXPORT
TAOS_RES
*
taos_schemaless_insert
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
int
protocol
,
int
precision
);
typedef
struct
tmq_t
tmq_t
;
typedef
struct
tmq_conf_t
tmq_conf_t
;
typedef
struct
tmq_list_t
tmq_list_t
;
typedef
struct
tmq_message_t
tmq_message_t
;
typedef
struct
tmq_message_topic_t
tmq_message_topic_t
;
typedef
struct
tmq_message_tb_t
tmq_message_tb_t
;
typedef
struct
tmq_tb_iter_t
tmq_tb_iter_t
;
typedef
struct
tmq_message_col_t
tmq_message_col_t
;
typedef
struct
tmq_col_iter_t
tmq_col_iter_t
;
DLL_EXPORT
tmq_list_t
*
tmq_list_new
();
DLL_EXPORT
int32_t
tmq_list_append
(
tmq_list_t
*
,
char
*
);
...
...
include/common/common.h
浏览文件 @
4ed37aee
...
...
@@ -38,6 +38,12 @@
// int16_t bytes;
//} SSchema;
typedef
struct
{
uint32_t
numOfTables
;
SArray
*
pGroupList
;
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
}
STableGroupInfo
;
typedef
struct
SColumnDataAgg
{
int16_t
colId
;
int64_t
sum
;
...
...
@@ -57,17 +63,12 @@ typedef struct SDataBlockInfo {
typedef
struct
SConstantItem
{
SColumnInfo
info
;
int32_t
start
Index
;
// run-length-encoding to save the space for multiple rows
int32_t
end
Index
;
int32_t
start
Row
;
// run-length-encoding to save the space for multiple rows
int32_t
end
Row
;
SVariant
value
;
}
SConstantItem
;
typedef
struct
{
uint32_t
numOfTables
;
SArray
*
pGroupList
;
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
}
STableGroupInfo
;
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef
struct
SSDataBlock
{
SColumnDataAgg
*
pBlockAgg
;
SArray
*
pDataBlock
;
// SArray<SColumnInfoData>
...
...
@@ -75,11 +76,88 @@ typedef struct SSDataBlock {
SDataBlockInfo
info
;
}
SSDataBlock
;
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
typedef
struct
SColumnInfoData
{
SColumnInfo
info
;
// TODO filter info needs to be removed
char
*
pData
;
// the corresponding block data in memory
SColumnInfo
info
;
// TODO filter info needs to be removed
char
*
nullbitmap
;
//
char
*
pData
;
// the corresponding block data in memory
}
SColumnInfoData
;
static
FORCE_INLINE
int32_t
tEncodeDataBlock
(
void
**
buf
,
const
SSDataBlock
*
pBlock
)
{
int64_t
tbUid
=
pBlock
->
info
.
uid
;
int32_t
numOfCols
=
pBlock
->
info
.
numOfCols
;
int32_t
rows
=
pBlock
->
info
.
rows
;
int32_t
sz
=
taosArrayGetSize
(
pBlock
->
pDataBlock
);
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
tbUid
);
tlen
+=
taosEncodeFixedI32
(
buf
,
numOfCols
);
tlen
+=
taosEncodeFixedI32
(
buf
,
rows
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SColumnInfoData
*
pColData
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
tlen
+=
taosEncodeFixedI16
(
buf
,
pColData
->
info
.
colId
);
tlen
+=
taosEncodeFixedI16
(
buf
,
pColData
->
info
.
type
);
tlen
+=
taosEncodeFixedI16
(
buf
,
pColData
->
info
.
bytes
);
int32_t
colSz
=
rows
*
pColData
->
info
.
bytes
;
tlen
+=
taosEncodeBinary
(
buf
,
pColData
->
pData
,
colSz
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeDataBlock
(
void
*
buf
,
SSDataBlock
*
pBlock
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pBlock
->
info
.
uid
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pBlock
->
info
.
numOfCols
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pBlock
->
info
.
rows
);
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pBlock
->
pDataBlock
=
taosArrayInit
(
sz
,
sizeof
(
SColumnInfoData
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SColumnInfoData
data
;
buf
=
taosDecodeFixedI16
(
buf
,
&
data
.
info
.
colId
);
buf
=
taosDecodeFixedI16
(
buf
,
&
data
.
info
.
type
);
buf
=
taosDecodeFixedI16
(
buf
,
&
data
.
info
.
bytes
);
int32_t
colSz
=
pBlock
->
info
.
rows
*
data
.
info
.
bytes
;
buf
=
taosDecodeBinary
(
buf
,
(
void
**
)
&
data
.
pData
,
colSz
);
taosArrayPush
(
pBlock
->
pDataBlock
,
&
data
);
}
return
buf
;
}
static
FORCE_INLINE
int32_t
tEncodeSMqConsumeRsp
(
void
**
buf
,
const
SMqConsumeRsp
*
pRsp
)
{
int32_t
tlen
=
0
;
int32_t
sz
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
consumerId
);
tlen
+=
tEncodeSSchemaWrapper
(
buf
,
pRsp
->
schemas
);
if
(
pRsp
->
pBlockData
)
{
sz
=
taosArrayGetSize
(
pRsp
->
pBlockData
);
}
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pBlock
=
(
SSDataBlock
*
)
taosArrayGet
(
pRsp
->
pBlockData
,
i
);
tlen
+=
tEncodeDataBlock
(
buf
,
pBlock
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqConsumeRsp
(
void
*
buf
,
SMqConsumeRsp
*
pRsp
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
consumerId
);
pRsp
->
schemas
=
(
SSchemaWrapper
*
)
calloc
(
1
,
sizeof
(
SSchemaWrapper
));
if
(
pRsp
->
schemas
==
NULL
)
return
NULL
;
buf
=
tDecodeSSchemaWrapper
(
buf
,
pRsp
->
schemas
);
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pRsp
->
pBlockData
=
taosArrayInit
(
sz
,
sizeof
(
SSDataBlock
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
block
;
tDecodeDataBlock
(
buf
,
&
block
);
taosArrayPush
(
pRsp
->
pBlockData
,
&
block
);
}
return
buf
;
}
//======================================================================================================================
// the following structure shared by parser and executor
typedef
struct
SColumn
{
...
...
include/common/tmsg.h
浏览文件 @
4ed37aee
...
...
@@ -289,6 +289,7 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
}
return
buf
;
}
typedef
struct
{
int32_t
acctId
;
int64_t
clusterId
;
...
...
@@ -296,6 +297,7 @@ typedef struct {
int8_t
superUser
;
int8_t
align
[
3
];
SEpSet
epSet
;
char
sVersion
[
128
];
}
SConnectRsp
;
typedef
struct
{
...
...
@@ -1588,16 +1590,53 @@ typedef struct SMqSetCVgRsp {
char
cGroup
[
TSDB_CONSUMER_GROUP_LEN
];
}
SMqSetCVgRsp
;
typedef
struct
SMqColData
{
int16_t
colId
;
int16_t
type
;
int16_t
bytes
;
}
SMqColMeta
;
typedef
struct
{
uint32_t
nCols
;
SSchema
*
pSchema
;
}
SSchemaWrapper
;
static
FORCE_INLINE
int32_t
tEncodeSSchema
(
void
**
buf
,
const
SSchema
*
pSchema
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI8
(
buf
,
pSchema
->
type
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pSchema
->
bytes
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pSchema
->
colId
);
tlen
+=
taosEncodeString
(
buf
,
pSchema
->
name
);
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSSchema
(
void
*
buf
,
SSchema
*
pSchema
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
pSchema
->
type
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSchema
->
bytes
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSchema
->
colId
);
buf
=
taosDecodeStringTo
(
buf
,
pSchema
->
name
);
return
buf
;
}
static
FORCE_INLINE
int32_t
tEncodeSSchemaWrapper
(
void
**
buf
,
const
SSchemaWrapper
*
pSW
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedU32
(
buf
,
pSW
->
nCols
);
for
(
int32_t
i
=
0
;
i
<
pSW
->
nCols
;
i
++
)
{
tlen
+=
tEncodeSSchema
(
buf
,
&
pSW
->
pSchema
[
i
]);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSSchemaWrapper
(
void
*
buf
,
SSchemaWrapper
*
pSW
)
{
buf
=
taosDecodeFixedU32
(
buf
,
&
pSW
->
nCols
);
pSW
->
pSchema
=
(
SSchema
*
)
calloc
(
pSW
->
nCols
,
sizeof
(
SSchema
));
if
(
pSW
->
pSchema
==
NULL
)
{
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
pSW
->
nCols
;
i
++
)
{
buf
=
tDecodeSSchema
(
buf
,
&
pSW
->
pSchema
[
i
]);
}
return
buf
;
}
typedef
struct
SMqTbData
{
int64_t
uid
;
int32_t
numOfRows
;
char
colData
[]
;
char
*
colData
;
}
SMqTbData
;
typedef
struct
SMqTopicBlk
{
...
...
@@ -1612,18 +1651,12 @@ typedef struct SMqTopicBlk {
}
SMqTopicData
;
typedef
struct
SMqConsumeRsp
{
int64_t
consumerId
;
int32_t
numOfCols
;
SMqColMeta
*
meta
;
int32_t
numOfTopics
;
SMqTopicData
*
data
;
int64_t
consumerId
;
SSchemaWrapper
*
schemas
;
int32_t
numOfTopics
;
SArray
*
pBlockData
;
//SArray<SSDataBlock>
}
SMqConsumeRsp
;
static
FORCE_INLINE
int32_t
tEncodeSMqConsumeRsp
(
void
**
buf
,
const
SMqConsumeRsp
*
pRsp
)
{
int32_t
tlen
=
0
;
return
tlen
;
}
// one req for one vg+topic
typedef
struct
SMqConsumeReq
{
SMsgHead
head
;
...
...
source/client/inc/clientInt.h
浏览文件 @
4ed37aee
...
...
@@ -114,6 +114,7 @@ typedef struct STscObj {
char
user
[
TSDB_USER_LEN
];
char
pass
[
TSDB_PASSWORD_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
char
ver
[
128
];
int32_t
acctId
;
uint32_t
connId
;
int32_t
connType
;
...
...
source/client/src/clientImpl.c
浏览文件 @
4ed37aee
...
...
@@ -119,7 +119,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo
*
p
=
calloc
(
1
,
sizeof
(
struct
SAppInstInfo
));
p
->
mgmtEp
=
epSet
;
p
->
pTransporter
=
openTransporter
(
user
,
secretEncrypt
,
tsNumOfCores
);
p
->
pAppHbMgr
=
appHbMgrInit
(
p
);
/*p->pAppHbMgr = appHbMgrInit(p);*/
taosHashPut
(
appInfo
.
pInstMap
,
key
,
strlen
(
key
),
&
p
,
POINTER_BYTES
);
pInst
=
&
p
;
...
...
@@ -218,12 +218,10 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag,
if
(
pQueryNode
->
type
==
TSDB_SQL_SELECT
)
{
setResSchemaInfo
(
&
pRequest
->
body
.
resInfo
,
pSchema
,
numOfCols
);
tfree
(
pSchema
);
pRequest
->
type
=
TDMT_VND_QUERY
;
}
else
{
tfree
(
pSchema
);
}
tfree
(
pSchema
);
return
code
;
}
...
...
@@ -621,6 +619,27 @@ struct tmq_message_t {
};
int32_t
tmq_poll_cb_inner
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SMqConsumeRsp
rsp
;
tDecodeSMqConsumeRsp
(
pMsg
->
pData
,
&
rsp
);
int32_t
colNum
=
rsp
.
schemas
->
nCols
;
for
(
int32_t
i
=
0
;
i
<
colNum
;
i
++
)
{
printf
(
"| %s |"
,
rsp
.
schemas
->
pSchema
[
i
].
name
);
}
printf
(
"
\n
"
);
int32_t
sz
=
taosArrayGetSize
(
rsp
.
pBlockData
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
rsp
.
pBlockData
,
i
);
int32_t
rows
=
pDataBlock
->
info
.
rows
;
for
(
int32_t
j
=
0
;
j
<
colNum
;
j
++
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
j
);
for
(
int32_t
k
=
0
;
k
<
rows
;
k
++
)
{
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
k
*
pColInfoData
->
info
.
bytes
);
if
(
j
==
0
)
printf
(
" %ld "
,
*
(
int64_t
*
)
var
);
if
(
j
==
1
)
printf
(
" %d "
,
*
(
int32_t
*
)
var
);
}
}
/*pDataBlock->*/
}
return
0
;
}
...
...
@@ -721,9 +740,9 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
pReq
,
.
len
=
sizeof
(
SMqConsumeReq
)
};
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
/*sendInfo->requestObjRefId = 0;*/
sendInfo
->
requestObjRefId
=
0
;
/*sendInfo->param = &tmq_message;*/
/*sendInfo->fp = tmq_poll_cb_inner;*/
sendInfo
->
fp
=
tmq_poll_cb_inner
;
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
sendInfo
);
...
...
@@ -776,7 +795,6 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
if
(
qIsDdlQuery
(
pQueryNode
))
{
CHECK_CODE_GOTO
(
execDdlQuery
(
pRequest
,
pQueryNode
),
_return
);
}
else
{
CHECK_CODE_GOTO
(
getPlan
(
pRequest
,
pQueryNode
,
&
pRequest
->
body
.
pDag
,
pNodeList
),
_return
);
CHECK_CODE_GOTO
(
scheduleQuery
(
pRequest
,
pRequest
->
body
.
pDag
,
pNodeList
),
_return
);
pRequest
->
code
=
terrno
;
...
...
source/client/src/clientMain.c
浏览文件 @
4ed37aee
...
...
@@ -275,3 +275,70 @@ int taos_affected_rows(TAOS_RES *res) {
}
int
taos_result_precision
(
TAOS_RES
*
res
)
{
return
TSDB_TIME_PRECISION_MILLI
;
}
int
taos_select_db
(
TAOS
*
taos
,
const
char
*
db
)
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
if
(
pObj
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
return
TSDB_CODE_TSC_DISCONNECTED
;
}
if
(
db
==
NULL
||
strlen
(
db
)
==
0
)
{
terrno
=
TSDB_CODE_TSC_INVALID_INPUT
;
return
terrno
;
}
char
sql
[
256
]
=
{
0
};
snprintf
(
sql
,
tListLen
(
sql
),
"use %s"
,
db
);
TAOS_RES
*
pRequest
=
taos_query
(
taos
,
sql
);
int32_t
code
=
taos_errno
(
pRequest
);
taos_free_result
(
pRequest
);
return
code
;
}
void
taos_stop_query
(
TAOS_RES
*
res
)
{
if
(
res
==
NULL
)
{
return
;
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
int32_t
numOfFields
=
taos_num_fields
(
pRequest
);
// It is not a query, no need to stop.
if
(
numOfFields
==
0
)
{
return
;
}
// scheduleCancelJob(pRequest->body.pQueryJob);
}
bool
taos_is_null
(
TAOS_RES
*
res
,
int32_t
row
,
int32_t
col
)
{
return
false
;
}
int
taos_fetch_block
(
TAOS_RES
*
res
,
TAOS_ROW
*
rows
)
{
return
0
;
}
int
taos_validate_sql
(
TAOS
*
taos
,
const
char
*
sql
)
{
return
true
;
}
const
char
*
taos_get_server_info
(
TAOS
*
taos
)
{
if
(
taos
==
NULL
)
{
return
NULL
;
}
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
return
pTscObj
->
ver
;
}
void
taos_query_a
(
TAOS
*
taos
,
const
char
*
sql
,
__taos_async_fn_t
fp
,
void
*
param
)
{
// TODO
}
void
taos_fetch_rows_a
(
TAOS_RES
*
res
,
__taos_async_fn_t
fp
,
void
*
param
)
{
// TODO
}
\ No newline at end of file
source/client/src/clientMsgHandler.c
浏览文件 @
4ed37aee
...
...
@@ -67,13 +67,14 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj
->
connId
=
pConnect
->
connId
;
pTscObj
->
acctId
=
pConnect
->
acctId
;
tstrncpy
(
pTscObj
->
ver
,
pConnect
->
sVersion
,
tListLen
(
pTscObj
->
ver
));
// update the appInstInfo
pTscObj
->
pAppInfo
->
clusterId
=
pConnect
->
clusterId
;
atomic_add_fetch_64
(
&
pTscObj
->
pAppInfo
->
numOfConns
,
1
);
SClientHbKey
connKey
=
{.
connId
=
pConnect
->
connId
,
.
hbType
=
HEARTBEAT_TYPE_QUERY
};
hbRegisterConn
(
pTscObj
->
pAppInfo
->
pAppHbMgr
,
connKey
,
NULL
);
/*SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};*/
/*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);*/
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug
(
"0x%"
PRIx64
" clusterId:%"
PRId64
", totalConn:%"
PRId64
,
pRequest
->
requestId
,
pConnect
->
clusterId
,
...
...
source/client/test/clientTests.cpp
浏览文件 @
4ed37aee
...
...
@@ -237,27 +237,27 @@ TEST(testCase, use_db_test) {
taos_close
(
pConn
);
}
TEST
(
testCase
,
drop_db_test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
showDB
(
pConn
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"drop database abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
showDB
(
pConn
);
pRes
=
taos_query
(
pConn
,
"create database abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"create to drop db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
//
TEST(testCase, drop_db_test) {
//
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//
assert(pConn != NULL);
//
//
showDB(pConn);
//
//
TAOS_RES* pRes = taos_query(pConn, "drop database abc1");
//
if (taos_errno(pRes) != 0) {
//
printf("failed to drop db, reason:%s\n", taos_errstr(pRes));
//
}
//
taos_free_result(pRes);
//
//
showDB(pConn);
//
//
pRes = taos_query(pConn, "create database abc1");
//
if (taos_errno(pRes) != 0) {
//
printf("create to drop db, reason:%s\n", taos_errstr(pRes));
//
}
//
taos_free_result(pRes);
//
taos_close(pConn);
//
}
TEST
(
testCase
,
create_stable_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
...
@@ -503,30 +503,30 @@ TEST(testCase, show_table_Test) {
taos_close
(
pConn
);
}
TEST
(
testCase
,
drop_stable_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
nullptr
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in creating db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in using db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop stable st1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop stable, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
//
TEST(testCase, drop_stable_Test) {
//
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//
assert(pConn != nullptr);
//
//
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1");
//
if (taos_errno(pRes) != 0) {
//
printf("error in creating db, reason:%s\n", taos_errstr(pRes));
//
}
//
taos_free_result(pRes);
//
//
pRes = taos_query(pConn, "use abc1");
//
if (taos_errno(pRes) != 0) {
//
printf("error in using db, reason:%s\n", taos_errstr(pRes));
//
}
//
taos_free_result(pRes);
//
//
pRes = taos_query(pConn, "drop stable st1");
//
if (taos_errno(pRes) != 0) {
//
printf("failed to drop stable, reason:%s\n", taos_errstr(pRes));
//
}
//
//
taos_free_result(pRes);
//
taos_close(pConn);
//
}
TEST
(
testCase
,
generated_request_id_test
)
{
SHashObj
*
phash
=
taosHashInit
(
10000
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_ENTRY_LOCK
);
...
...
@@ -601,7 +601,7 @@ TEST(testCase, tmq_subscribe_Test) {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg1");
tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_topic_1");
tmq_subscribe(tmq, topic_list);
...
...
@@ -712,7 +712,7 @@ TEST(testCase, agg_query_tables) {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"select count(*) from t
u
"
);
pRes
=
taos_query
(
pConn
,
"select count(*) from t
_x_19
"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to select from table, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
...
...
source/common/src/tep.c
浏览文件 @
4ed37aee
#include "tep.h"
#include "common.h"
#include "tglobal.h"
#include "tlockfree.h"
...
...
@@ -59,3 +60,99 @@ SEpSet getEpSet_s(SCorEpSet *pEpSet) {
return
ep
;
}
bool
colDataIsNull
(
const
SColumnInfoData
*
pColumnInfoData
,
uint32_t
totalRows
,
uint32_t
row
,
SColumnDataAgg
*
pColAgg
)
{
if
(
pColAgg
!=
NULL
)
{
if
(
pColAgg
->
numOfNull
==
totalRows
)
{
ASSERT
(
pColumnInfoData
->
nullbitmap
==
NULL
);
return
true
;
}
else
if
(
pColAgg
->
numOfNull
==
0
)
{
ASSERT
(
pColumnInfoData
->
nullbitmap
==
NULL
);
return
false
;
}
}
if
(
pColumnInfoData
->
nullbitmap
==
NULL
)
{
return
false
;
}
uint8_t
v
=
(
pColumnInfoData
->
nullbitmap
[
row
>>
3
]
&
(
1
<<
(
8
-
(
row
&
0x07
))));
return
(
v
==
1
);
}
bool
colDataIsNull_f
(
const
char
*
bitmap
,
uint32_t
row
)
{
return
(
bitmap
[
row
>>
3
]
&
(
1
<<
(
8
-
(
row
&
0x07
))));
}
void
colDataSetNull_f
(
char
*
bitmap
,
uint32_t
row
)
{
// TODO
return
;
}
void
*
colDataGet
(
const
SColumnInfoData
*
pColumnInfoData
,
uint32_t
row
)
{
if
(
IS_VAR_DATA_TYPE
(
pColumnInfoData
->
info
.
type
))
{
uint32_t
offset
=
((
uint32_t
*
)
pColumnInfoData
->
pData
)[
row
];
return
(
char
*
)(
pColumnInfoData
->
pData
)
+
offset
;
// the first part is the pointer to the true binary data
}
else
{
return
(
char
*
)(
pColumnInfoData
->
pData
)
+
(
row
*
pColumnInfoData
->
info
.
bytes
);
}
}
int32_t
colDataAppend
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
currentRow
,
const
char
*
pData
,
bool
isNull
)
{
ASSERT
(
pColumnInfoData
!=
NULL
);
if
(
isNull
)
{
// TODO set null value in the nullbitmap
return
0
;
}
int32_t
type
=
pColumnInfoData
->
info
.
type
;
if
(
IS_VAR_DATA_TYPE
(
type
))
{
// TODO continue append var_type
}
else
{
char
*
p
=
pColumnInfoData
->
pData
+
pColumnInfoData
->
info
.
bytes
*
currentRow
;
switch
(
type
)
{
case
TSDB_DATA_TYPE_TINYINT
:
case
TSDB_DATA_TYPE_UTINYINT
:
{
*
(
int8_t
*
)
p
=
*
(
int8_t
*
)
pData
;
break
;}
default:
assert
(
0
);
}
}
return
0
;
}
size_t
colDataGetCols
(
const
SSDataBlock
*
pBlock
)
{
ASSERT
(
pBlock
);
size_t
constantCols
=
(
pBlock
->
pConstantList
!=
NULL
)
?
taosArrayGetSize
(
pBlock
->
pConstantList
)
:
0
;
ASSERT
(
pBlock
->
info
.
numOfCols
==
taosArrayGetSize
(
pBlock
->
pDataBlock
)
+
constantCols
);
return
pBlock
->
info
.
numOfCols
;
}
size_t
colDataGetRows
(
const
SSDataBlock
*
pBlock
)
{
return
pBlock
->
info
.
rows
;
}
int32_t
colDataUpdateTsWindow
(
SSDataBlock
*
pDataBlock
)
{
if
(
pDataBlock
==
NULL
||
pDataBlock
->
info
.
rows
<=
0
)
{
return
0
;
}
if
(
pDataBlock
->
info
.
numOfCols
<=
0
)
{
return
-
1
;
}
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
0
);
if
(
pColInfoData
->
info
.
type
!=
TSDB_DATA_TYPE_TIMESTAMP
)
{
return
0
;
}
ASSERT
(
pColInfoData
->
nullbitmap
==
NULL
);
pDataBlock
->
info
.
window
.
skey
=
*
(
TSKEY
*
)
colDataGet
(
pColInfoData
,
0
);
pDataBlock
->
info
.
window
.
ekey
=
*
(
TSKEY
*
)
colDataGet
(
pColInfoData
,
(
pDataBlock
->
info
.
rows
-
1
));
return
0
;
}
source/dnode/mgmt/impl/src/dndMgmt.c
浏览文件 @
4ed37aee
...
...
@@ -398,7 +398,7 @@ void dndSendStatusReq(SDnode *pDnode) {
static
void
dndUpdateDnodeCfg
(
SDnode
*
pDnode
,
SDnodeCfg
*
pCfg
)
{
SDnodeMgmt
*
pMgmt
=
&
pDnode
->
dmgmt
;
if
(
pMgmt
->
dnodeId
==
0
)
{
dInfo
(
"set dnodeId:%d clusterId:%"
PRId64
,
pCfg
->
dnodeId
,
pCfg
->
clusterId
);
dInfo
(
"set dnodeId:%d clusterId:
0x
%"
PRId64
,
pCfg
->
dnodeId
,
pCfg
->
clusterId
);
taosWLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
dnodeId
=
pCfg
->
dnodeId
;
pMgmt
->
clusterId
=
pCfg
->
clusterId
;
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
4ed37aee
...
...
@@ -237,13 +237,11 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
if
(
pIter
==
NULL
)
break
;
if
(
pObj
->
pDnode
==
NULL
)
break
;
pEpSet
->
eps
[
pEpSet
->
numOfEps
].
port
=
htons
(
pObj
->
pDnode
->
port
);
memcpy
(
pEpSet
->
eps
[
pEpSet
->
numOfEps
].
fqdn
,
pObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
if
(
pObj
->
role
==
TAOS_SYNC_STATE_LEADER
)
{
pEpSet
->
inUse
=
pEpSet
->
numOfEps
;
}
pEpSet
->
numOfEps
++
;
addEpIntoEpSet
(
pEpSet
,
pObj
->
pDnode
->
fqdn
,
htons
(
pObj
->
pDnode
->
port
))
;
sdbRelease
(
pSdb
,
pObj
);
}
}
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
4ed37aee
...
...
@@ -14,14 +14,15 @@
*/
#define _DEFAULT_SOURCE
#include "tglobal.h"
#include "mndProfile.h"
#include "mndConsumer.h"
//
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndTopic.h"
//
#include "mndTopic.h"
#include "mndUser.h"
#include "mndVgroup.h"
//
#include "mndVgroup.h"
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18
...
...
@@ -230,10 +231,12 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
goto
CONN_OVER
;
}
pRsp
->
acctId
=
htonl
(
pUser
->
acctId
);
pRsp
->
acctId
=
htonl
(
pUser
->
acctId
);
pRsp
->
superUser
=
pUser
->
superUser
;
pRsp
->
clusterId
=
htobe64
(
pMnode
->
clusterId
);
pRsp
->
connId
=
htonl
(
pConn
->
id
);
pRsp
->
connId
=
htonl
(
pConn
->
id
);
snprintf
(
pRsp
->
sVersion
,
tListLen
(
pRsp
->
sVersion
),
"ver:%s
\n
build:%s
\n
gitinfo:%s"
,
version
,
buildinfo
,
gitinfo
);
mndGetMnodeEpSet
(
pMnode
,
&
pRsp
->
epSet
);
pReq
->
contLen
=
sizeof
(
SConnectRsp
);
...
...
source/dnode/vnode/inc/meta.h
浏览文件 @
4ed37aee
...
...
@@ -37,11 +37,6 @@ typedef struct SMetaCfg {
uint64_t
lruSize
;
}
SMetaCfg
;
typedef
struct
{
uint32_t
nCols
;
SSchema
*
pSchema
;
}
SSchemaWrapper
;
typedef
struct
SMTbCursor
SMTbCursor
;
typedef
struct
SMCtbCursor
SMCtbCursor
;
...
...
source/dnode/vnode/inc/tq.h
浏览文件 @
4ed37aee
...
...
@@ -149,11 +149,12 @@ typedef struct STqGroup {
}
STqGroup
;
typedef
struct
STqTaskItem
{
int8_t
status
;
int64_t
offset
;
void
*
dst
;
qTaskInfo_t
task
;
SSubQueryMsg
*
pQueryMsg
;
int8_t
status
;
int64_t
offset
;
void
*
dst
;
qTaskInfo_t
task
;
STqReadHandle
*
pReadHandle
;
SSubQueryMsg
*
pQueryMsg
;
}
STqTaskItem
;
// new version
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
4ed37aee
...
...
@@ -69,14 +69,17 @@ typedef struct {
}
SVnodeOpt
;
typedef
struct
STqReadHandle
{
int64_t
ver
;
uint64_t
tbUid
;
SSubmitMsg
*
pMsg
;
SSubmitBlk
*
pBlock
;
SSubmitMsgIter
msgIter
;
SSubmitBlkIter
blkIter
;
SMeta
*
pMeta
;
SArray
*
pColIdList
;
int64_t
ver
;
uint64_t
tbUid
;
SSubmitMsg
*
pMsg
;
SSubmitBlk
*
pBlock
;
SSubmitMsgIter
msgIter
;
SSubmitBlkIter
blkIter
;
SMeta
*
pVnodeMeta
;
SArray
*
pColIdList
;
//SArray<int32_t>
int32_t
sver
;
SSchemaWrapper
*
pSchemaWrapper
;
STSchema
*
pSchema
;
}
STqReadHandle
;
/* ------------------------ SVnode ------------------------ */
...
...
source/dnode/vnode/src/meta/metaBDBImpl.c
浏览文件 @
4ed37aee
...
...
@@ -20,6 +20,10 @@
#include "tcoding.h"
#include "thash.h"
#define IMPL_WITH_LOCK 1
// #if IMPL_WITH_LOCK
// #endif
typedef
struct
{
tb_uid_t
uid
;
int32_t
sver
;
...
...
@@ -27,6 +31,9 @@ typedef struct {
}
SSchemaKey
;
struct
SMetaDB
{
#if IMPL_WITH_LOCK
pthread_rwlock_t
rwlock
;
#endif
// DB
DB
*
pTbDB
;
DB
*
pSchemaDB
;
...
...
@@ -58,6 +65,9 @@ static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
static
void
metaClearTbCfg
(
STbCfg
*
pTbCfg
);
static
int
metaEncodeSchema
(
void
**
buf
,
SSchemaWrapper
*
pSW
);
static
void
*
metaDecodeSchema
(
void
*
buf
,
SSchemaWrapper
*
pSW
);
static
void
metaDBWLock
(
SMetaDB
*
pDB
);
static
void
metaDBRLock
(
SMetaDB
*
pDB
);
static
void
metaDBULock
(
SMetaDB
*
pDB
);
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
...
...
@@ -130,8 +140,10 @@ void metaCloseDB(SMeta *pMeta) {
int
metaSaveTableToDB
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
)
{
tb_uid_t
uid
;
char
buf
[
512
];
char
buf1
[
512
];
void
*
pBuf
;
DBT
key
,
value
;
DBT
key1
,
value1
;
DBT
key2
,
value2
;
SSchema
*
pSchema
=
NULL
;
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
...
...
@@ -143,19 +155,17 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
{
// save table info
pBuf
=
buf
;
memset
(
&
key
,
0
,
sizeof
(
key
));
memset
(
&
value
,
0
,
sizeof
(
key
));
memset
(
&
key
1
,
0
,
sizeof
(
key1
));
memset
(
&
value
1
,
0
,
sizeof
(
key1
));
key
.
data
=
&
uid
;
key
.
size
=
sizeof
(
uid
);
key
1
.
data
=
&
uid
;
key
1
.
size
=
sizeof
(
uid
);
metaEncodeTbInfo
(
&
pBuf
,
pTbCfg
);
value
.
data
=
buf
;
value
.
size
=
POINTER_DISTANCE
(
pBuf
,
buf
);
value
.
app_data
=
pTbCfg
;
pMeta
->
pDB
->
pTbDB
->
put
(
pMeta
->
pDB
->
pTbDB
,
NULL
,
&
key
,
&
value
,
0
);
value1
.
data
=
buf
;
value1
.
size
=
POINTER_DISTANCE
(
pBuf
,
buf
);
value1
.
app_data
=
pTbCfg
;
}
// save schema
...
...
@@ -169,22 +179,27 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
}
if
(
pSchema
)
{
pBuf
=
buf
;
memset
(
&
key
,
0
,
sizeof
(
key
));
memset
(
&
value
,
0
,
sizeof
(
key
));
pBuf
=
buf
1
;
memset
(
&
key
2
,
0
,
sizeof
(
key2
));
memset
(
&
value
2
,
0
,
sizeof
(
key2
));
SSchemaKey
schemaKey
=
{
uid
,
0
/*TODO*/
,
0
};
key
.
data
=
&
schemaKey
;
key
.
size
=
sizeof
(
schemaKey
);
key
2
.
data
=
&
schemaKey
;
key
2
.
size
=
sizeof
(
schemaKey
);
SSchemaWrapper
sw
=
{.
nCols
=
ncols
,
.
pSchema
=
pSchema
};
metaEncodeSchema
(
&
pBuf
,
&
sw
);
value
.
data
=
buf
;
value
.
size
=
POINTER_DISTANCE
(
pBuf
,
buf
);
value2
.
data
=
buf1
;
value2
.
size
=
POINTER_DISTANCE
(
pBuf
,
buf1
);
}
pMeta
->
pDB
->
pSchemaDB
->
put
(
pMeta
->
pDB
->
pSchemaDB
,
NULL
,
&
key
,
&
value
,
0
);
metaDBWLock
(
pMeta
->
pDB
);
pMeta
->
pDB
->
pTbDB
->
put
(
pMeta
->
pDB
->
pTbDB
,
NULL
,
&
key1
,
&
value1
,
0
);
if
(
pSchema
)
{
pMeta
->
pDB
->
pSchemaDB
->
put
(
pMeta
->
pDB
->
pSchemaDB
,
NULL
,
&
key2
,
&
value2
,
0
);
}
metaDBULock
(
pMeta
->
pDB
);
return
0
;
}
...
...
@@ -234,11 +249,18 @@ static SMetaDB *metaNewDB() {
return
NULL
;
}
#if IMPL_WITH_LOCK
pthread_rwlock_init
(
&
pDB
->
rwlock
,
NULL
);
#endif
return
pDB
;
}
static
void
metaFreeDB
(
SMetaDB
*
pDB
)
{
if
(
pDB
)
{
#if IMPL_WITH_LOCK
pthread_rwlock_destroy
(
&
pDB
->
rwlock
);
#endif
free
(
pDB
);
}
}
...
...
@@ -467,7 +489,9 @@ STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) {
key
.
size
=
sizeof
(
uid
);
// Query
metaDBRLock
(
pDB
);
ret
=
pDB
->
pTbDB
->
get
(
pDB
->
pTbDB
,
NULL
,
&
key
,
&
value
,
0
);
metaDBULock
(
pDB
);
if
(
ret
!=
0
)
{
return
NULL
;
}
...
...
@@ -496,7 +520,9 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
key
.
size
=
strlen
(
tbname
);
// Query
metaDBRLock
(
pDB
);
ret
=
pDB
->
pNameIdx
->
pget
(
pDB
->
pNameIdx
,
NULL
,
&
key
,
&
pkey
,
&
pvalue
,
0
);
metaDBULock
(
pDB
);
if
(
ret
!=
0
)
{
return
NULL
;
}
...
...
@@ -529,7 +555,9 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
key
.
size
=
sizeof
(
schemaKey
);
// Query
metaDBRLock
(
pDB
);
ret
=
pDB
->
pSchemaDB
->
get
(
pDB
->
pSchemaDB
,
NULL
,
&
key
,
&
value
,
0
);
metaDBULock
(
pDB
);
if
(
ret
!=
0
)
{
printf
(
"failed to query schema DB since %s================
\n
"
,
db_strerror
(
ret
));
return
NULL
;
...
...
@@ -687,4 +715,22 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
}
else
{
return
0
;
}
}
\ No newline at end of file
}
static
void
metaDBWLock
(
SMetaDB
*
pDB
)
{
#if IMPL_WITH_LOCK
pthread_rwlock_wrlock
(
&
(
pDB
->
rwlock
));
#endif
}
static
void
metaDBRLock
(
SMetaDB
*
pDB
)
{
#if IMPL_WITH_LOCK
pthread_rwlock_rdlock
(
&
(
pDB
->
rwlock
));
#endif
}
static
void
metaDBULock
(
SMetaDB
*
pDB
)
{
#if IMPL_WITH_LOCK
pthread_rwlock_unlock
(
&
(
pDB
->
rwlock
));
#endif
}
source/dnode/vnode/src/tq/tq.c
浏览文件 @
4ed37aee
...
...
@@ -13,9 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tcompare.h"
#include "tqInt.h"
#include "tqMetaStore.h"
#include "tcompare.h"
// static
// read next version data
...
...
@@ -484,7 +484,8 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
int
tqSerializeConsumer
(
const
STqConsumerHandle
*
pConsumer
,
STqSerializedHead
**
ppHead
)
{
int32_t
num
=
taosArrayGetSize
(
pConsumer
->
topics
);
int32_t
sz
=
sizeof
(
STqSerializedHead
)
+
sizeof
(
int64_t
)
*
2
+
TSDB_TOPIC_FNAME_LEN
+
num
*
(
sizeof
(
int64_t
)
+
TSDB_TOPIC_FNAME_LEN
);
int32_t
sz
=
sizeof
(
STqSerializedHead
)
+
sizeof
(
int64_t
)
*
2
+
TSDB_TOPIC_FNAME_LEN
+
num
*
(
sizeof
(
int64_t
)
+
TSDB_TOPIC_FNAME_LEN
);
if
(
sz
>
(
*
ppHead
)
->
ssize
)
{
void
*
tmpPtr
=
realloc
(
*
ppHead
,
sz
);
if
(
tmpPtr
==
NULL
)
{
...
...
@@ -511,13 +512,13 @@ int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead**
*
(
int64_t
*
)
ptr
=
pTopic
->
committedOffset
;
POINTER_SHIFT
(
ptr
,
sizeof
(
int64_t
));
}
return
0
;
}
const
void
*
tqDeserializeConsumer
(
const
STqSerializedHead
*
pHead
,
STqConsumerHandle
**
ppConsumer
)
{
STqConsumerHandle
*
pConsumer
=
*
ppConsumer
;
const
void
*
ptr
=
pHead
->
content
;
const
void
*
ptr
=
pHead
->
content
;
pConsumer
->
consumerId
=
*
(
int64_t
*
)
ptr
;
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int64_t
));
pConsumer
->
epoch
=
*
(
int64_t
*
)
ptr
;
...
...
@@ -668,32 +669,33 @@ int tqItemSSize() {
#endif
int32_t
tqProcessConsumeReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
SMqConsumeReq
*
pReq
=
pMsg
->
pCont
;
SRpcMsg
rpcMsg
;
int64_t
reqId
=
pReq
->
reqId
;
int64_t
consumerId
=
pReq
->
consumerId
;
int64_t
reqOffset
=
pReq
->
offset
;
int64_t
fetchOffset
=
reqOffset
;
int64_t
blockingTime
=
pReq
->
blockingTime
;
SMqConsumeReq
*
pReq
=
pMsg
->
pCont
;
SRpcMsg
rpcMsg
;
int64_t
reqId
=
pReq
->
reqId
;
int64_t
consumerId
=
pReq
->
consumerId
;
int64_t
reqOffset
=
pReq
->
offset
;
int64_t
fetchOffset
=
reqOffset
;
int64_t
blockingTime
=
pReq
->
blockingTime
;
int
rspLen
=
0
;
int
rspLen
=
0
;
SMqConsumeRsp
rsp
=
{.
consumerId
=
consumerId
,
.
numOfTopics
=
1
,
.
pBlockData
=
NULL
};
STqConsumerHandle
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
consumerId
);
ASSERT
(
pConsumer
);
int
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
int
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
STqTopicHandle
*
pTopic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
//TODO: support multiple topic in one req
//
TODO: support multiple topic in one req
if
(
strcmp
(
pTopic
->
topicName
,
pReq
->
topic
)
!=
0
)
{
continue
;
}
if
(
fetchOffset
==
-
1
)
{
fetchOffset
=
pTopic
->
committedOffset
+
1
;
}
int8_t
pos
;
int8_t
skip
=
0
;
if
(
fetchOffset
==
-
1
)
{
fetchOffset
=
pTopic
->
committedOffset
+
1
;
}
int8_t
pos
;
int8_t
skip
=
0
;
SWalHead
*
pHead
;
while
(
1
)
{
pos
=
fetchOffset
%
TQ_BUFFER_SIZE
;
...
...
@@ -727,7 +729,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
qSetStreamInput
(
task
,
pCont
);
//SArray<SSDataBlock>
//
SArray<SSDataBlock>
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
while
(
1
)
{
SSDataBlock
*
pDataBlock
;
...
...
@@ -741,6 +743,8 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
break
;
}
}
//TODO copy
rsp
.
schemas
=
pTopic
->
buffer
.
output
[
pos
].
pReadHandle
->
pSchemaWrapper
;
atomic_store_8
(
&
pTopic
->
buffer
.
output
[
pos
].
status
,
0
);
...
...
@@ -750,6 +754,9 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
continue
;
}
rsp
.
pBlockData
=
pRes
;
#if 0
pTopic->buffer.output[pos].dst = pRes;
if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) {
pTopic->buffer.firstOffset = pReq->offset;
...
...
@@ -757,13 +764,20 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) {
pTopic->buffer.lastOffset = pReq->offset;
}
#endif
}
// put output into rsp
SMqConsumeRsp
rsp
=
{
.
consumerId
=
consumerId
,
.
numOfTopics
=
1
};
int32_t
tlen
=
tEncodeSMqConsumeRsp
(
NULL
,
&
rsp
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
pMsg
->
code
=
-
1
;
return
-
1
;
}
void
*
abuf
=
buf
;
tEncodeSMqConsumeRsp
(
&
abuf
,
&
rsp
);
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
rpcSendResponse
(
pMsg
);
return
0
;
}
...
...
@@ -799,6 +813,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
for
(
int
i
=
0
;
i
<
TQ_BUFFER_SIZE
;
i
++
)
{
pTopic
->
buffer
.
output
[
i
].
status
=
0
;
STqReadHandle
*
pReadHandle
=
tqInitSubmitMsgScanner
(
pTq
->
pMeta
);
pTopic
->
buffer
.
output
[
i
].
pReadHandle
=
pReadHandle
;
pTopic
->
buffer
.
output
[
i
].
task
=
qCreateStreamExecTaskInfo
(
req
.
qmsg
,
pReadHandle
);
}
taosArrayPush
(
pConsumer
->
topics
,
pTopic
);
...
...
@@ -813,10 +828,13 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
if
(
pReadHandle
==
NULL
)
{
return
NULL
;
}
pReadHandle
->
pMeta
=
pMeta
;
pReadHandle
->
p
Vnode
Meta
=
pMeta
;
pReadHandle
->
pMsg
=
NULL
;
pReadHandle
->
ver
=
-
1
;
pReadHandle
->
pColIdList
=
NULL
;
pReadHandle
->
sver
=
-
1
;
pReadHandle
->
pSchema
=
NULL
;
pReadHandle
->
pSchemaWrapper
=
NULL
;
return
pReadHandle
;
}
...
...
@@ -837,13 +855,13 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
if
(
pHandle
->
pBlock
==
NULL
)
return
false
;
pHandle
->
pBlock
->
uid
=
htobe64
(
pHandle
->
pBlock
->
uid
);
if
(
pHandle
->
tbUid
==
pHandle
->
pBlock
->
uid
){
if
(
pHandle
->
tbUid
==
pHandle
->
pBlock
->
uid
)
{
pHandle
->
pBlock
->
tid
=
htonl
(
pHandle
->
pBlock
->
tid
);
pHandle
->
pBlock
->
sversion
=
htonl
(
pHandle
->
pBlock
->
sversion
);
pHandle
->
pBlock
->
dataLen
=
htonl
(
pHandle
->
pBlock
->
dataLen
);
pHandle
->
pBlock
->
schemaLen
=
htonl
(
pHandle
->
pBlock
->
schemaLen
);
pHandle
->
pBlock
->
numOfRows
=
htons
(
pHandle
->
pBlock
->
numOfRows
);
return
true
;
return
true
;
}
}
return
false
;
...
...
@@ -859,41 +877,71 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
}
SArray
*
tqRetrieveDataBlock
(
STqReadHandle
*
pHandle
)
{
int32_t
sversion
=
pHandle
->
pBlock
->
sversion
;
//TODO : change sversion
STSchema
*
pTschema
=
metaGetTbTSchema
(
pHandle
->
pMeta
,
pHandle
->
pBlock
->
uid
,
0
);
tb_uid_t
quid
;
STbCfg
*
pTbCfg
=
metaGetTbInfoByUid
(
pHandle
->
pMeta
,
pHandle
->
pBlock
->
uid
);
if
(
pTbCfg
->
type
==
META_CHILD_TABLE
)
{
quid
=
pTbCfg
->
ctbCfg
.
suid
;
}
else
{
quid
=
pHandle
->
pBlock
->
uid
;
/*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion
int32_t
sversion
=
0
;
if
(
pHandle
->
sver
!=
sversion
)
{
pHandle
->
pSchema
=
metaGetTbTSchema
(
pHandle
->
pVnodeMeta
,
pHandle
->
pBlock
->
uid
,
sversion
);
tb_uid_t
quid
;
STbCfg
*
pTbCfg
=
metaGetTbInfoByUid
(
pHandle
->
pVnodeMeta
,
pHandle
->
pBlock
->
uid
);
if
(
pTbCfg
->
type
==
META_CHILD_TABLE
)
{
quid
=
pTbCfg
->
ctbCfg
.
suid
;
}
else
{
quid
=
pHandle
->
pBlock
->
uid
;
}
pHandle
->
pSchemaWrapper
=
metaGetTableSchema
(
pHandle
->
pVnodeMeta
,
quid
,
sversion
,
true
);
pHandle
->
sver
=
sversion
;
}
SSchemaWrapper
*
pSchemaWrapper
=
metaGetTableSchema
(
pHandle
->
pMeta
,
quid
,
0
,
true
);
SArray
*
pArray
=
taosArrayInit
(
pSchemaWrapper
->
nCols
,
sizeof
(
SColumnInfoData
));
STSchema
*
pTschema
=
pHandle
->
pSchema
;
SSchemaWrapper
*
pSchemaWrapper
=
pHandle
->
pSchemaWrapper
;
int32_t
numOfRows
=
pHandle
->
pBlock
->
numOfRows
;
int32_t
numOfCols
=
pHandle
->
pSchema
->
numOfCols
;
int32_t
colNumNeed
=
taosArrayGetSize
(
pHandle
->
pColIdList
);
SArray
*
pArray
=
taosArrayInit
(
colNumNeed
,
sizeof
(
SColumnInfoData
));
if
(
pArray
==
NULL
)
{
return
NULL
;
}
SColumnInfoData
colInfo
;
int
sz
=
pSchemaWrapper
->
nCols
*
pSchemaWrapper
->
pSchema
->
bytes
;
colInfo
.
pData
=
malloc
(
sz
);
if
(
colInfo
.
pData
==
NULL
)
{
return
NULL
;
int
j
=
0
;
for
(
int32_t
i
=
0
;
i
<
colNumNeed
;
i
++
)
{
int32_t
colId
=
*
(
int32_t
*
)
taosArrayGet
(
pHandle
->
pColIdList
,
i
);
while
(
j
<
pSchemaWrapper
->
nCols
&&
pSchemaWrapper
->
pSchema
[
j
].
colId
<
colId
)
{
j
++
;
}
SSchema
*
pColSchema
=
&
pSchemaWrapper
->
pSchema
[
j
];
ASSERT
(
pColSchema
->
colId
==
colId
);
SColumnInfoData
colInfo
=
{
0
};
int
sz
=
numOfRows
*
pColSchema
->
bytes
;
colInfo
.
info
.
bytes
=
pColSchema
->
bytes
;
colInfo
.
info
.
colId
=
colId
;
colInfo
.
info
.
type
=
pColSchema
->
type
;
colInfo
.
pData
=
calloc
(
1
,
sz
);
if
(
colInfo
.
pData
==
NULL
)
{
// TODO free
taosArrayDestroy
(
pArray
);
return
NULL
;
}
taosArrayPush
(
pArray
,
&
colInfo
);
}
SMemRow
row
;
int32_t
kvIdx
;
int32_t
kvIdx
=
0
;
tInitSubmitBlkIter
(
pHandle
->
pBlock
,
&
pHandle
->
blkIter
);
while
((
row
=
tGetSubmitBlkNext
(
&
pHandle
->
blkIter
))
!=
NULL
)
{
for
(
int
i
=
0
;
i
<
pTschema
->
numOfCols
&&
kvIdx
<
pTschema
->
numOfCols
;
i
++
)
{
// TODO: filter out unused column
STColumn
*
pCol
=
schemaColAt
(
pTschema
,
i
);
// get all wanted col of that block
for
(
int32_t
i
=
0
;
i
<
colNumNeed
;
i
++
)
{
SColumnInfoData
*
pColData
=
taosArrayGet
(
pArray
,
i
);
STColumn
*
pCol
=
schemaColAt
(
pTschema
,
i
);
// TODO
ASSERT
(
pCol
->
colId
==
pColData
->
info
.
colId
);
void
*
val
=
tdGetMemRowDataOfColEx
(
row
,
pCol
->
colId
,
pCol
->
type
,
TD_DATA_ROW_HEAD_SIZE
+
pCol
->
offset
,
&
kvIdx
);
// TODO: handle varlen
memcpy
(
POINTER_SHIFT
(
colInfo
.
pData
,
pCol
->
offset
),
val
,
pCol
->
bytes
);
memcpy
(
pColData
->
pData
,
val
,
pCol
->
bytes
);
}
}
taosArrayPush
(
pArray
,
&
colInfo
);
return
pArray
;
}
source/libs/executor/src/executorimpl.c
浏览文件 @
4ed37aee
...
...
@@ -5070,6 +5070,7 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) {
SStreamBlockScanInfo
*
pInfo
=
pOperator
->
info
;
SDataBlockInfo
*
pBlockInfo
=
&
pInfo
->
pRes
->
info
;
pBlockInfo
->
rows
=
0
;
while
(
tqNextDataBlock
(
pInfo
->
readerHandle
))
{
pTaskInfo
->
code
=
tqRetrieveDataBlockInfo
(
pInfo
->
readerHandle
,
pBlockInfo
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -5896,7 +5897,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
finalizeQueryResult
(
pOperator
,
pInfo
->
pCtx
,
&
pInfo
->
resultRowInfo
,
pInfo
->
rowCellInfoOffset
);
pInfo
->
pRes
->
info
.
rows
=
getNumOfResult
(
pInfo
->
pCtx
,
pOperator
->
numOfOutput
);
return
pInfo
->
pRes
;
return
(
pInfo
->
pRes
->
info
.
rows
!=
0
)
?
pInfo
->
pRes
:
NULL
;
}
static
SSDataBlock
*
doSTableAggregate
(
void
*
param
,
bool
*
newgroup
)
{
...
...
@@ -8825,14 +8826,14 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) {
}
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
)
{
qDebug
(
"%s execTask is freed"
,
GET_TASKID
(
pTaskInfo
));
doDestroyTableQueryInfo
(
&
pTaskInfo
->
tableqinfoGroupInfo
);
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
tfree
(
pTaskInfo
->
sql
);
tfree
(
pTaskInfo
->
id
.
str
);
qDebug
(
"%s execTask is freed"
,
GET_TASKID
(
pTaskInfo
));
tfree
(
pTaskInfo
);
}
...
...
source/libs/planner/src/physicalPlanJson.c
浏览文件 @
4ed37aee
...
...
@@ -1125,8 +1125,6 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
*
str
=
cJSON_Print
(
json
);
cJSON_Delete
(
json
);
// printf("====Physical plan:====\n");
// printf("%s\n", *str);
*
len
=
strlen
(
*
str
)
+
1
;
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/planner/src/planner.c
浏览文件 @
4ed37aee
...
...
@@ -40,7 +40,8 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
tfree
(
pDag
);
}
int32_t
qCreateQueryDag
(
const
struct
SQueryNode
*
pNode
,
struct
SQueryDag
**
pDag
,
SSchema
**
pResSchema
,
int32_t
*
numOfCols
,
SArray
*
pNodeList
,
uint64_t
requestId
)
{
int32_t
qCreateQueryDag
(
const
struct
SQueryNode
*
pNode
,
struct
SQueryDag
**
pDag
,
SSchema
**
pResSchema
,
int32_t
*
numOfCols
,
SArray
*
pNodeList
,
uint64_t
requestId
)
{
SQueryPlanNode
*
pLogicPlan
;
int32_t
code
=
createQueryPlan
(
pNode
,
&
pLogicPlan
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
...
...
@@ -49,9 +50,10 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
}
if
(
pLogicPlan
->
info
.
type
!=
QNODE_MODIFY
)
{
// char* str = NULL;
// queryPlanToString(pLogicPlan, &str);
// printf("%s\n", str);
char
*
str
=
NULL
;
queryPlanToString
(
pLogicPlan
,
&
str
);
qDebug
(
"reqId:0x%"
PRIx64
": %s"
,
requestId
,
str
);
tfree
(
str
);
}
code
=
optimizeQueryPlan
(
pLogicPlan
);
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
4ed37aee
...
...
@@ -1100,8 +1100,6 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
atomic_store_8
(
&
ctx
->
queryInQueue
,
0
);
atomic_store_8
(
&
ctx
->
queryContinue
,
0
);
DataSinkHandle
sinkHandle
=
ctx
->
sinkHandle
;
QW_ERR_JRET
(
qwExecTask
(
QW_FPARAMS
(),
ctx
,
&
queryEnd
));
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
))
{
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
4ed37aee
...
...
@@ -1194,17 +1194,18 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
code
=
atomic_load_32
(
&
pJob
->
errCode
);
SCH_ERR_RET
(
code
);
SCH_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SSubplan
*
plan
=
pTask
->
plan
;
if
(
NULL
==
pTask
->
msg
)
{
if
(
NULL
==
pTask
->
msg
)
{
// TODO add more detailed reason for failure
code
=
qSubPlanToString
(
plan
,
&
pTask
->
msg
,
&
pTask
->
msgLen
);
if
(
TSDB_CODE_SUCCESS
!=
code
||
NULL
==
pTask
->
msg
||
pTask
->
msgLen
<=
0
)
{
SCH_TASK_ELOG
(
"
subplanToString error, code:%x, msg:%p, len:%d"
,
code
,
pTask
->
msg
,
pTask
->
msgLen
);
SCH_TASK_ELOG
(
"
failed to create physical plan, code:%s, msg:%p, len:%d"
,
tstrerror
(
code
)
,
pTask
->
msg
,
pTask
->
msgLen
);
SCH_ERR_JRET
(
code
);
}
else
{
SCH_TASK_DLOG
(
" ===physical plan=== len:%d, %s"
,
pTask
->
msgLen
,
pTask
->
msg
);
}
}
...
...
@@ -1218,13 +1219,10 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
}
SCH_ERR_JRET
(
schBuildAndSendMsg
(
pJob
,
pTask
,
NULL
,
plan
->
msgType
));
return
TSDB_CODE_SUCCESS
;
_return:
SCH_ERR_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
SCH_RET
(
code
);
}
...
...
source/libs/transport/CMakeLists.txt
浏览文件 @
4ed37aee
...
...
@@ -13,7 +13,7 @@ target_link_libraries(
PUBLIC util
PUBLIC common
)
if
(
${
BUILD_WITH_UV
}
)
if
(
${
BUILD_WITH_UV
_TRANS
}
)
target_include_directories
(
transport
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/contrib/libuv/include"
...
...
@@ -25,7 +25,7 @@ if (${BUILD_WITH_UV})
PUBLIC uv_a
)
add_definitions
(
-DUSE_UV
)
endif
(
${
BUILD_WITH_UV
}
)
endif
(
${
BUILD_WITH_UV
_TRANS
}
)
if
(
${
BUILD_TEST
}
)
add_subdirectory
(
test
)
...
...
source/libs/transport/src/transCli.c
浏览文件 @
4ed37aee
...
...
@@ -45,13 +45,13 @@ typedef struct SCliThrdObj {
pthread_t
thread
;
uv_loop_t
*
loop
;
uv_async_t
*
cliAsync
;
//
uv_timer_t
*
pT
imer
;
uv_timer_t
*
t
imer
;
void
*
pool
;
// conn pool
queue
msg
;
pthread_mutex_t
msgMtx
;
uint64_t
nextTimeout
;
// next timeout
void
*
pTransInst
;
//
bool
quit
;
}
SCliThrdObj
;
typedef
struct
SClientObj
{
...
...
@@ -94,6 +94,8 @@ static void clientHandleResp(SCliConn* conn);
static
void
clientHandleExcept
(
SCliConn
*
conn
);
// handle req from app
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
);
static
void
clientHandleQuit
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
);
static
void
clientSendQuit
(
SCliThrdObj
*
thrd
);
static
void
destroyUserdata
(
SRpcMsg
*
userdata
);
...
...
@@ -136,8 +138,8 @@ static void clientHandleResp(SCliConn* conn) {
destroyCmsg
(
pMsg
);
conn
->
data
=
NULL
;
// start thread's timer of conn pool if not active
if
(
!
uv_is_active
((
uv_handle_t
*
)
pThrd
->
pT
imer
)
&&
pRpc
->
idleTime
>
0
)
{
uv_timer_start
((
uv_timer_t
*
)
pThrd
->
pT
imer
,
clientTimeoutCb
,
CONN_PERSIST_TIME
(
pRpc
->
idleTime
)
/
2
,
0
);
if
(
!
uv_is_active
((
uv_handle_t
*
)
pThrd
->
t
imer
)
&&
pRpc
->
idleTime
>
0
)
{
uv_timer_start
((
uv_timer_t
*
)
pThrd
->
t
imer
,
clientTimeoutCb
,
CONN_PERSIST_TIME
(
pRpc
->
idleTime
)
/
2
,
0
);
}
}
static
void
clientHandleExcept
(
SCliConn
*
pConn
)
{
...
...
@@ -155,7 +157,7 @@ static void clientHandleExcept(SCliConn* pConn) {
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
rpcMsg
.
code
=
-
1
;
rpcMsg
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(
pCtx
->
pTransInst
->
cfp
)(
NULL
,
&
rpcMsg
,
NULL
);
pConn
->
notifyCount
+=
1
;
...
...
@@ -332,9 +334,8 @@ static void clientWriteCb(uv_write_t* req, int status) {
tDebug
(
"conn %p data already was written out"
,
pConn
);
SCliMsg
*
pMsg
=
pConn
->
data
;
if
(
pMsg
==
NULL
)
{
destroy
// handle
return
;
// handle
return
;
}
destroyUserdata
(
&
pMsg
->
msg
);
}
else
{
...
...
@@ -375,6 +376,15 @@ static void clientConnCb(uv_connect_t* req, int status) {
clientWrite
(
pConn
);
}
static
void
clientHandleQuit
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
)
{
tDebug
(
"thread %p start to quit"
,
pThrd
);
destroyCmsg
(
pMsg
);
uv_close
((
uv_handle_t
*
)
pThrd
->
cliAsync
,
NULL
);
uv_timer_stop
(
pThrd
->
timer
);
pThrd
->
quit
=
true
;
// uv__async_stop(pThrd->cliAsync);
uv_stop
(
pThrd
->
loop
);
}
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
)
{
uint64_t
et
=
taosGetTimestampUs
();
uint64_t
el
=
et
-
pMsg
->
st
;
...
...
@@ -389,7 +399,13 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn
->
data
=
pMsg
;
conn
->
writeReq
->
data
=
conn
;
transDestroyBuffer
(
&
conn
->
readBuf
);
if
(
pThrd
->
quit
)
{
clientHandleExcept
(
conn
);
return
;
}
clientWrite
(
conn
);
}
else
{
SCliConn
*
conn
=
calloc
(
1
,
sizeof
(
SCliConn
));
conn
->
ref
++
;
...
...
@@ -430,7 +446,12 @@ static void clientAsyncCb(uv_async_t* handle) {
QUEUE_REMOVE
(
h
);
SCliMsg
*
pMsg
=
QUEUE_DATA
(
h
,
SCliMsg
,
q
);
clientHandleReq
(
pMsg
,
pThrd
);
if
(
pMsg
->
ctx
==
NULL
)
{
clientHandleQuit
(
pMsg
,
pThrd
);
}
else
{
clientHandleReq
(
pMsg
,
pThrd
);
}
// clientHandleReq(pMsg, pThrd);
count
++
;
}
if
(
count
>=
2
)
{
...
...
@@ -458,7 +479,7 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
int
err
=
pthread_create
(
&
pThrd
->
thread
,
NULL
,
clientThread
,
(
void
*
)(
pThrd
));
if
(
err
==
0
)
{
tDebug
(
"sucess to create tranport-client thread %d"
,
i
);
tDebug
(
"suc
c
ess to create tranport-client thread %d"
,
i
);
}
cli
->
pThreadObj
[
i
]
=
pThrd
;
}
...
...
@@ -492,20 +513,24 @@ static SCliThrdObj* createThrdObj() {
uv_async_init
(
pThrd
->
loop
,
pThrd
->
cliAsync
,
clientAsyncCb
);
pThrd
->
cliAsync
->
data
=
pThrd
;
pThrd
->
pT
imer
=
malloc
(
sizeof
(
uv_timer_t
));
uv_timer_init
(
pThrd
->
loop
,
pThrd
->
pT
imer
);
pThrd
->
pT
imer
->
data
=
pThrd
;
pThrd
->
t
imer
=
malloc
(
sizeof
(
uv_timer_t
));
uv_timer_init
(
pThrd
->
loop
,
pThrd
->
t
imer
);
pThrd
->
t
imer
->
data
=
pThrd
;
pThrd
->
pool
=
creatConnPool
(
1
);
pThrd
->
quit
=
false
;
return
pThrd
;
}
static
void
destroyThrdObj
(
SCliThrdObj
*
pThrd
)
{
if
(
pThrd
==
NULL
)
{
return
;
}
uv_stop
(
pThrd
->
loop
);
pthread_join
(
pThrd
->
thread
,
NULL
);
pthread_mutex_destroy
(
&
pThrd
->
msgMtx
);
free
(
pThrd
->
cliAsync
);
free
(
pThrd
->
timer
);
free
(
pThrd
->
loop
);
free
(
pThrd
);
}
...
...
@@ -517,10 +542,22 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
free
(
ctx
);
}
//
static
void
clientSendQuit
(
SCliThrdObj
*
thrd
)
{
// cli can stop gracefully
SCliMsg
*
msg
=
calloc
(
1
,
sizeof
(
SCliMsg
));
msg
->
ctx
=
NULL
;
//
pthread_mutex_lock
(
&
thrd
->
msgMtx
);
QUEUE_PUSH
(
&
thrd
->
msg
,
&
msg
->
q
);
pthread_mutex_unlock
(
&
thrd
->
msgMtx
);
uv_async_send
(
thrd
->
cliAsync
);
}
void
taosCloseClient
(
void
*
arg
)
{
// impl later
SClientObj
*
cli
=
arg
;
for
(
int
i
=
0
;
i
<
cli
->
numOfThreads
;
i
++
)
{
clientSendQuit
(
cli
->
pThreadObj
[
i
]);
destroyThrdObj
(
cli
->
pThreadObj
[
i
]);
}
free
(
cli
->
pThreadObj
);
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
4ed37aee
...
...
@@ -70,6 +70,7 @@ typedef struct SServerObj {
uv_pipe_t
**
pipe
;
uint32_t
ip
;
uint32_t
port
;
uv_async_t
*
pAcceptAsync
;
// just to quit from from accept thread
}
SServerObj
;
static
const
char
*
notify
=
"a"
;
...
...
@@ -88,9 +89,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status);
static
void
uvOnAcceptCb
(
uv_stream_t
*
stream
,
int
status
);
static
void
uvOnConnectionCb
(
uv_stream_t
*
q
,
ssize_t
nread
,
const
uv_buf_t
*
buf
);
static
void
uvWorkerAsyncCb
(
uv_async_t
*
handle
);
static
void
uvAcceptAsyncCb
(
uv_async_t
*
handle
);
static
void
uvPrepareSendData
(
SSrvMsg
*
msg
,
uv_buf_t
*
wb
);
static
void
uvStartSendResp
(
SSrvMsg
*
msg
);
static
void
destroySmsg
(
SSrvMsg
*
smsg
);
// check whether already read complete packet
static
bool
readComplete
(
SConnBuffer
*
buf
);
...
...
@@ -389,7 +392,13 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
tError
(
"except occurred, continue"
);
continue
;
}
uvStartSendResp
(
msg
);
if
(
msg
->
pConn
==
NULL
)
{
//
free
(
msg
);
uv_stop
(
pThrd
->
loop
);
}
else
{
uvStartSendResp
(
msg
);
}
// uv_buf_t wb;
// uvPrepareSendData(msg, &wb);
// uv_timer_stop(conn->pTimer);
...
...
@@ -397,6 +406,10 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
// uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
}
}
static
void
uvAcceptAsyncCb
(
uv_async_t
*
async
)
{
SServerObj
*
srv
=
async
->
data
;
uv_stop
(
srv
->
loop
);
}
void
uvOnAcceptCb
(
uv_stream_t
*
stream
,
int
status
)
{
if
(
status
==
-
1
)
{
...
...
@@ -517,8 +530,12 @@ static bool addHandleToAcceptloop(void* arg) {
return
false
;
}
struct
sockaddr_in
bind_addr
;
// register an async here to quit server gracefully
srv
->
pAcceptAsync
=
calloc
(
1
,
sizeof
(
uv_async_t
));
uv_async_init
(
srv
->
loop
,
srv
->
pAcceptAsync
,
uvAcceptAsyncCb
);
srv
->
pAcceptAsync
->
data
=
srv
;
struct
sockaddr_in
bind_addr
;
uv_ip4_addr
(
"0.0.0.0"
,
srv
->
port
,
&
bind_addr
);
if
((
err
=
uv_tcp_bind
(
&
srv
->
server
,
(
const
struct
sockaddr
*
)
&
bind_addr
,
0
))
!=
0
)
{
tError
(
"failed to bind: %s"
,
uv_err_name
(
err
));
...
...
@@ -647,21 +664,42 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
return
;
}
pthread_join
(
pThrd
->
thread
,
NULL
);
// free(srv->pipe[i]);
free
(
pThrd
->
loop
);
pthread_mutex_destroy
(
&
pThrd
->
msgMtx
);
free
(
pThrd
->
workerAsync
);
free
(
pThrd
);
}
void
sendQuitToWorkThrd
(
SWorkThrdObj
*
pThrd
)
{
SSrvMsg
*
srvMsg
=
calloc
(
1
,
sizeof
(
SSrvMsg
));
pthread_mutex_lock
(
&
pThrd
->
msgMtx
);
QUEUE_PUSH
(
&
pThrd
->
msg
,
&
srvMsg
->
q
);
pthread_mutex_unlock
(
&
pThrd
->
msgMtx
);
tDebug
(
"send quit msg to work thread"
);
uv_async_send
(
pThrd
->
workerAsync
);
}
void
taosCloseServer
(
void
*
arg
)
{
// impl later
SServerObj
*
srv
=
arg
;
for
(
int
i
=
0
;
i
<
srv
->
numOfThreads
;
i
++
)
{
sendQuitToWorkThrd
(
srv
->
pThreadObj
[
i
]);
destroyWorkThrd
(
srv
->
pThreadObj
[
i
]);
}
tDebug
(
"send quit msg to accept thread"
);
uv_async_send
(
srv
->
pAcceptAsync
);
pthread_join
(
srv
->
thread
,
NULL
);
free
(
srv
->
pThreadObj
);
free
(
srv
->
pAcceptAsync
);
free
(
srv
->
loop
);
for
(
int
i
=
0
;
i
<
srv
->
numOfThreads
;
i
++
)
{
free
(
srv
->
pipe
[
i
]);
}
free
(
srv
->
pipe
);
free
(
srv
->
pThreadObj
);
pthread_join
(
srv
->
thread
,
NULL
);
free
(
srv
);
}
...
...
source/libs/transport/test/CMakeLists.txt
浏览文件 @
4ed37aee
add_executable
(
transportTest
""
)
add_executable
(
client
""
)
add_executable
(
server
""
)
add_executable
(
transUT
""
)
target_sources
(
transUT
PRIVATE
"transUT.cc"
)
target_sources
(
transportTest
PRIVATE
...
...
@@ -28,6 +34,13 @@ target_link_libraries (transportTest
gtest_main
transport
)
target_link_libraries
(
transUT
os
util
common
gtest_main
transport
)
target_include_directories
(
client
PUBLIC
...
...
@@ -48,6 +61,13 @@ target_include_directories(server
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
transUT
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/transport"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
server
os
util
...
...
source/libs/transport/test/transUT.cc
0 → 100644
浏览文件 @
4ed37aee
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <cstdio>
#include <cstring>
#include "trpc.h"
using
namespace
std
;
class
TransObj
{
public:
TransObj
()
{
const
char
*
label
=
"APP"
;
const
char
*
secret
=
"secret"
;
const
char
*
user
=
"user"
;
const
char
*
ckey
=
"ckey"
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
(
char
*
)
label
;
rpcInit
.
numOfThreads
=
5
;
rpcInit
.
cfp
=
NULL
;
rpcInit
.
sessions
=
100
;
rpcInit
.
idleTime
=
100
;
rpcInit
.
user
=
(
char
*
)
user
;
rpcInit
.
secret
=
(
char
*
)
secret
;
rpcInit
.
ckey
=
(
char
*
)
ckey
;
rpcInit
.
spi
=
1
;
}
bool
startCli
()
{
trans
=
NULL
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
trans
=
rpcOpen
(
&
rpcInit
);
return
trans
!=
NULL
?
true
:
false
;
}
bool
startSrv
()
{
trans
=
NULL
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
trans
=
rpcOpen
(
&
rpcInit
);
return
trans
!=
NULL
?
true
:
false
;
}
bool
stop
()
{
rpcClose
(
trans
);
trans
=
NULL
;
return
true
;
}
private:
void
*
trans
;
SRpcInit
rpcInit
;
};
class
TransEnv
:
public
::
testing
::
Test
{
protected:
virtual
void
SetUp
()
{
// set up trans obj
tr
=
new
TransObj
();
}
virtual
void
TearDown
()
{
// tear down
delete
tr
;
}
TransObj
*
tr
=
NULL
;
};
TEST_F
(
TransEnv
,
test_start_stop
)
{
assert
(
tr
->
startCli
());
assert
(
tr
->
stop
());
assert
(
tr
->
startSrv
());
assert
(
tr
->
stop
());
}
tools/shell/src/shellEngine.c
浏览文件 @
4ed37aee
...
...
@@ -360,6 +360,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
}
}
else
{
int
num_rows_affacted
=
taos_affected_rows
(
pSql
);
taos_free_result
(
pSql
);
et
=
taosGetTimestampUs
();
printf
(
"Query OK, %d of %d row(s) in database (%.6fs)
\n
"
,
num_rows_affacted
,
num_rows_affacted
,
(
et
-
st
)
/
1E6
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录