Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6be271c8
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1191
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
6be271c8
编写于
1月 18, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/vnode
上级
f5d998ee
17a26c47
变更
32
展开全部
隐藏空白更改
内联
并排
Showing
32 changed file
with
2482 addition
and
1244 deletion
+2482
-1244
include/common/tmsg.h
include/common/tmsg.h
+146
-0
include/libs/executor/dataSinkMgt.h
include/libs/executor/dataSinkMgt.h
+1
-1
include/libs/qworker/qworker.h
include/libs/qworker/qworker.h
+1
-1
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+16
-0
include/util/taoserror.h
include/util/taoserror.h
+5
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+18
-15
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+27
-24
source/common/src/tmsg.c
source/common/src/tmsg.c
+1
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+7
-3
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+78
-9
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+32
-4
source/dnode/mnode/impl/test/profile/profile.cpp
source/dnode/mnode/impl/test/profile/profile.cpp
+5
-5
source/dnode/vnode/inc/tq.h
source/dnode/vnode/inc/tq.h
+21
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+67
-0
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+19
-33
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
+2
-2
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+1
-1
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+15
-8
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+3
-0
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+2
-0
source/libs/qworker/inc/qworkerInt.h
source/libs/qworker/inc/qworkerInt.h
+71
-27
source/libs/qworker/inc/qworkerMsg.h
source/libs/qworker/inc/qworkerMsg.h
+48
-0
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+743
-1092
source/libs/qworker/src/qworkerMsg.c
source/libs/qworker/src/qworkerMsg.c
+553
-0
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+91
-0
source/libs/transport/inc/rpcLog.h
source/libs/transport/inc/rpcLog.h
+43
-7
source/libs/transport/src/transport.c
source/libs/transport/src/transport.c
+36
-9
source/libs/transport/test/CMakeLists.txt
source/libs/transport/test/CMakeLists.txt
+37
-0
source/libs/transport/test/rclient.c
source/libs/transport/test/rclient.c
+196
-0
source/libs/transport/test/rserver.c
source/libs/transport/test/rserver.c
+188
-0
source/util/src/terror.c
source/util/src/terror.c
+8
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
6be271c8
...
...
@@ -234,6 +234,92 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey)
return
buf
;
}
typedef
struct
SMqHbVgInfo
{
int32_t
vgId
;
}
SMqHbVgInfo
;
static
FORCE_INLINE
int
taosEncodeSMqVgInfo
(
void
**
buf
,
const
SMqHbVgInfo
*
pVgInfo
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pVgInfo
->
vgId
);
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSMqVgInfo
(
void
*
buf
,
SMqHbVgInfo
*
pVgInfo
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pVgInfo
->
vgId
);
return
buf
;
}
typedef
struct
SMqHbTopicInfo
{
int32_t
epoch
;
int64_t
topicUid
;
char
name
[
TSDB_TOPIC_FNAME_LEN
];
SArray
*
pVgInfo
;
}
SMqHbTopicInfo
;
static
FORCE_INLINE
int
taosEncodeSMqHbTopicInfoMsg
(
void
**
buf
,
const
SMqHbTopicInfo
*
pTopicInfo
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pTopicInfo
->
epoch
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pTopicInfo
->
topicUid
);
tlen
+=
taosEncodeString
(
buf
,
pTopicInfo
->
name
);
int32_t
sz
=
taosArrayGetSize
(
pTopicInfo
->
pVgInfo
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqHbVgInfo
*
pVgInfo
=
(
SMqHbVgInfo
*
)
taosArrayGet
(
pTopicInfo
->
pVgInfo
,
i
);
tlen
+=
taosEncodeSMqVgInfo
(
buf
,
pVgInfo
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSMqHbTopicInfoMsg
(
void
*
buf
,
SMqHbTopicInfo
*
pTopicInfo
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pTopicInfo
->
epoch
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pTopicInfo
->
topicUid
);
buf
=
taosDecodeStringTo
(
buf
,
pTopicInfo
->
name
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pTopicInfo
->
pVgInfo
=
taosArrayInit
(
sz
,
sizeof
(
SMqHbVgInfo
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqHbVgInfo
vgInfo
;
buf
=
taosDecodeSMqVgInfo
(
buf
,
&
vgInfo
);
taosArrayPush
(
pTopicInfo
->
pVgInfo
,
&
vgInfo
);
}
return
buf
;
}
typedef
struct
SMqHbMsg
{
int32_t
status
;
// ask hb endpoint
int32_t
epoch
;
int64_t
consumerId
;
SArray
*
pTopics
;
// SArray<SMqHbTopicInfo>
}
SMqHbMsg
;
static
FORCE_INLINE
int
taosEncodeSMqMsg
(
void
**
buf
,
const
SMqHbMsg
*
pMsg
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pMsg
->
status
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pMsg
->
epoch
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pMsg
->
consumerId
);
int32_t
sz
=
taosArrayGetSize
(
pMsg
->
pTopics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SMqHbTopicInfo
*
topicInfo
=
(
SMqHbTopicInfo
*
)
taosArrayGet
(
pMsg
->
pTopics
,
i
);
tlen
+=
taosEncodeSMqHbTopicInfoMsg
(
buf
,
topicInfo
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSMqMsg
(
void
*
buf
,
SMqHbMsg
*
pMsg
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pMsg
->
status
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pMsg
->
epoch
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pMsg
->
consumerId
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pMsg
->
pTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqHbTopicInfo
));
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SMqHbTopicInfo
topicInfo
;
buf
=
taosDecodeSMqHbTopicInfoMsg
(
buf
,
&
topicInfo
);
taosArrayPush
(
pMsg
->
pTopics
,
&
topicInfo
);
}
return
buf
;
}
typedef
struct
{
int32_t
vgId
;
...
...
@@ -399,6 +485,66 @@ static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
return
buf
;
}
typedef
struct
SMqHbOneTopicBatchRsp
{
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
SArray
*
rsps
;
// SArray<SMqHbRsp>
}
SMqHbOneTopicBatchRsp
;
static
FORCE_INLINE
int
taosEncodeSMqHbOneTopicBatchRsp
(
void
**
buf
,
const
SMqHbOneTopicBatchRsp
*
pBatchRsp
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pBatchRsp
->
topicName
);
int32_t
sz
=
taosArrayGetSize
(
pBatchRsp
->
rsps
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqHbRsp
*
pRsp
=
(
SMqHbRsp
*
)
taosArrayGet
(
pBatchRsp
->
rsps
,
i
);
tlen
+=
taosEncodeSMqHbRsp
(
buf
,
pRsp
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSMqHbOneTopicBatchRsp
(
void
*
buf
,
SMqHbOneTopicBatchRsp
*
pBatchRsp
)
{
int32_t
sz
;
buf
=
taosDecodeStringTo
(
buf
,
pBatchRsp
->
topicName
);
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pBatchRsp
->
rsps
=
taosArrayInit
(
sz
,
sizeof
(
SMqHbRsp
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqHbRsp
rsp
;
buf
=
taosDecodeSMqHbRsp
(
buf
,
&
rsp
);
buf
=
taosArrayPush
(
pBatchRsp
->
rsps
,
&
rsp
);
}
return
buf
;
}
typedef
struct
SMqHbBatchRsp
{
int64_t
consumerId
;
SArray
*
batchRsps
;
// SArray<SMqHbOneTopicBatchRsp>
}
SMqHbBatchRsp
;
static
FORCE_INLINE
int
taosEncodeSMqHbBatchRsp
(
void
**
buf
,
const
SMqHbBatchRsp
*
pBatchRsp
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pBatchRsp
->
consumerId
);
int32_t
sz
;
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqHbOneTopicBatchRsp
*
pRsp
=
(
SMqHbOneTopicBatchRsp
*
)
taosArrayGet
(
pBatchRsp
->
batchRsps
,
i
);
tlen
+=
taosEncodeSMqHbOneTopicBatchRsp
(
buf
,
pRsp
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSMqHbBatchRsp
(
void
*
buf
,
SMqHbBatchRsp
*
pBatchRsp
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pBatchRsp
->
consumerId
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pBatchRsp
->
batchRsps
=
taosArrayInit
(
sz
,
sizeof
(
SMqHbOneTopicBatchRsp
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqHbOneTopicBatchRsp
rsp
;
buf
=
taosDecodeSMqHbOneTopicBatchRsp
(
buf
,
&
rsp
);
buf
=
taosArrayPush
(
pBatchRsp
->
batchRsps
,
&
rsp
);
}
return
buf
;
}
typedef
struct
{
int32_t
acctId
;
int64_t
clusterId
;
...
...
include/libs/executor/dataSinkMgt.h
浏览文件 @
6be271c8
...
...
@@ -48,7 +48,7 @@ typedef struct SOutputData {
int8_t
compressed
;
char
*
pData
;
bool
queryEnd
;
bool
needSchedule
;
int32_t
scheduleJobNo
;
int32_t
bufStatus
;
int64_t
useconds
;
int8_t
precision
;
...
...
include/libs/qworker/qworker.h
浏览文件 @
6be271c8
...
...
@@ -55,7 +55,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcess
QueryContinue
Msg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcess
CQuery
Msg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessDataSinkMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
6be271c8
...
...
@@ -59,6 +59,11 @@ typedef struct SQueryResult {
char
*
msg
;
}
SQueryResult
;
typedef
struct
STaskInfo
{
SQueryNodeAddr
addr
;
SSubQueryMsg
*
msg
;
}
STaskInfo
;
int32_t
schedulerInit
(
SSchedulerCfg
*
cfg
);
/**
...
...
@@ -101,6 +106,17 @@ void scheduleFreeJob(void *pJob);
void
schedulerDestroy
(
void
);
/**
* convert dag to task list
* @param pDag
* @param pTasks SArray**<STaskInfo>
* @return
*/
int32_t
schedulerConvertDagToTaskList
(
SQueryDag
*
pDag
,
SArray
**
pTasks
);
void
schedulerFreeTaskList
(
SArray
*
taskList
);
#ifdef __cplusplus
}
#endif
...
...
include/util/taoserror.h
浏览文件 @
6be271c8
...
...
@@ -356,7 +356,11 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_TASK_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0712) //"Task already exist")
#define TSDB_CODE_QRY_RES_CACHE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task result cache not exist")
#define TSDB_CODE_QRY_TASK_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x0714) //"Task cancelled")
#define TSDB_CODE_QRY_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0715) //"Task dropped")
#define TSDB_CODE_QRY_TASK_CANCELLING TAOS_DEF_ERROR_CODE(0, 0x0716) //"Task cancelling")
#define TSDB_CODE_QRY_TASK_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0717) //"Task dropping")
#define TSDB_CODE_QRY_DUPLICATTED_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0718) //"Duplicatted operation")
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) //"Task message error")
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired")
...
...
source/client/src/clientImpl.c
浏览文件 @
6be271c8
...
...
@@ -322,24 +322,27 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
taosArrayDestroy
(
pArray
);
return
NULL
;
}
strcpy
(
kv
.
key
,
"groupId"
);
kv
.
keyLen
=
strlen
(
"groupId"
)
+
1
;
kv
.
value
=
malloc
(
256
);
if
(
kv
.
value
==
NULL
)
{
free
(
kv
.
key
);
taosArrayDestroy
(
pArray
);
return
NULL
;
strcpy
(
kv
.
key
,
"mq-tmp"
);
kv
.
keyLen
=
strlen
(
"mq-tmp"
)
+
1
;
SMqHbMsg
*
pMqHb
=
malloc
(
sizeof
(
SMqHbMsg
));
if
(
pMqHb
==
NULL
)
{
return
pArray
;
}
pMqHb
->
consumerId
=
connKey
.
connId
;
SArray
*
clientTopics
=
pTmq
->
clientTopics
;
int
sz
=
taosArrayGetSize
(
clientTopics
);
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SMqClientTopic
*
pCTopic
=
taosArrayGet
(
clientTopics
,
i
);
if
(
pCTopic
->
vgId
==
-
1
)
{
pMqHb
->
status
=
1
;
break
;
}
}
strcpy
(
kv
.
value
,
pTmq
->
groupId
);
kv
.
valueLen
=
strlen
(
pTmq
->
groupId
)
+
1
;
kv
.
value
=
pMqHb
;
kv
.
valueLen
=
sizeof
(
SMqHbMsg
);
taosArrayPush
(
pArray
,
&
kv
);
strcpy
(
kv
.
key
,
"clientUid"
);
kv
.
keyLen
=
strlen
(
"clientUid"
)
+
1
;
*
(
uint32_t
*
)
kv
.
value
=
pTmq
->
pTscObj
->
connId
;
kv
.
valueLen
=
sizeof
(
uint32_t
);
return
NULL
;
return
pArray
;
}
tmq_t
*
tmqCreateConsumerImpl
(
TAOS
*
conn
,
tmq_conf_t
*
conf
)
{
...
...
source/client/test/clientTests.cpp
浏览文件 @
6be271c8
...
...
@@ -277,12 +277,15 @@ TEST(testCase, connect_Test) {
// ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
// pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
// }
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
// pRes = taos_query(pConn, "drop stable `123_$^)`");
// if (taos_errno(pRes) != 0) {
...
...
@@ -518,30 +521,30 @@ TEST(testCase, show_stable_Test) {
// taosHashCleanup(phash);
//}
//
//
TEST(testCase, create_topic_Test) {
//
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//
assert(pConn != NULL);
//
//
TAOS_RES* pRes = taos_query(pConn, "use abc1");
//
if (taos_errno(pRes) != 0) {
//
printf("error in use db, reason:%s\n", taos_errstr(pRes));
//
}
//
taos_free_result(pRes);
//
//
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
//
ASSERT_TRUE(pFields == nullptr);
//
//
int32_t numOfFields = taos_num_fields(pRes);
//
ASSERT_EQ(numOfFields, 0);
//
//
taos_free_result(pRes);
//
//
char* sql = "select * from tu";
//
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
//
taos_free_result(pRes);
//
taos_close(pConn);
//
}
//
TEST
(
testCase
,
create_topic_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
ASSERT_TRUE
(
pFields
==
nullptr
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
ASSERT_EQ
(
numOfFields
,
0
);
taos_free_result
(
pRes
);
char
*
sql
=
"select * from tu"
;
pRes
=
taos_create_topic
(
pConn
,
"test_topic_1"
,
sql
,
strlen
(
sql
));
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
//TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr);
...
...
source/common/src/tmsg.c
浏览文件 @
6be271c8
...
...
@@ -92,7 +92,7 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
int32_t
kvNum
=
taosHashGetSize
(
pReq
->
info
);
tlen
+=
taosEncodeFixedI32
(
buf
,
kvNum
);
SKv
kv
;
void
*
pIter
=
taosHashIterate
(
pReq
->
info
,
pIter
);
void
*
pIter
=
taosHashIterate
(
pReq
->
info
,
NULL
);
while
(
pIter
!=
NULL
)
{
taosHashGetKey
(
pIter
,
&
kv
.
key
,
(
size_t
*
)
&
kv
.
keyLen
);
kv
.
valueLen
=
taosHashGetDataLen
(
pIter
);
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
6be271c8
...
...
@@ -346,19 +346,23 @@ typedef struct SMqTopicObj {
char
*
logicalPlan
;
char
*
physicalPlan
;
SHashObj
*
cgroups
;
// SHashObj<SMqCGroup>
SHashObj
*
consumers
;
// SHashObj<SMqConsumerObj>
}
SMqTopicObj
;
// TODO: add cache and change name to id
typedef
struct
SMqConsumerTopic
{
char
name
[
TSDB_TOPIC_NAME_LEN
];
SList
*
vgroups
;
// SList<int32_t>
int32_t
epoch
;
char
name
[
TSDB_TOPIC_NAME_LEN
];
//TODO: replace with something with ep
SList
*
vgroups
;
// SList<int32_t>
}
SMqConsumerTopic
;
typedef
struct
SMqConsumerObj
{
SRWLatch
lock
;
int64_t
consumerId
;
SRWLatch
lock
;
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
SArray
*
topics
;
// SArray<SMqConsumerTopic>
SHashObj
*
topicHash
;
}
SMqConsumerObj
;
typedef
struct
SMqSubConsumerObj
{
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
6be271c8
...
...
@@ -15,10 +15,13 @@
#define _DEFAULT_SOURCE
#include "mndProfile.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndTopic.h"
#include "mndUser.h"
#include "mndVgroup.h"
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18
...
...
@@ -269,29 +272,95 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
return
TSDB_CODE_SUCCESS
;
}
static
SClientHbRsp
*
mndMqHbBuildRsp
(
SMnode
*
pMnode
,
SClientHbReq
*
pReq
)
{
SClientHbRsp
*
pRsp
=
malloc
(
sizeof
(
SClientHbRsp
));
if
(
pRsp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pRsp
->
connKey
=
pReq
->
connKey
;
SMqHbBatchRsp
batchRsp
;
batchRsp
.
batchRsps
=
taosArrayInit
(
0
,
sizeof
(
SMqHbRsp
));
if
(
batchRsp
.
batchRsps
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
SClientHbKey
connKey
=
pReq
->
connKey
;
SHashObj
*
pObj
=
pReq
->
info
;
SKv
*
pKv
=
taosHashGet
(
pObj
,
"mq-tmp"
,
strlen
(
"mq-tmp"
)
+
1
);
if
(
pKv
==
NULL
)
{
free
(
pRsp
);
return
NULL
;
}
SMqHbMsg
mqHb
;
taosDecodeSMqMsg
(
pKv
->
value
,
&
mqHb
);
/*int64_t clientUid = htonl(pKv->value);*/
/*if (mqHb.epoch )*/
int
sz
=
taosArrayGetSize
(
mqHb
.
pTopics
);
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMnode
,
mqHb
.
consumerId
);
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SMqHbOneTopicBatchRsp
innerBatchRsp
;
innerBatchRsp
.
rsps
=
taosArrayInit
(
sz
,
sizeof
(
SMqHbRsp
));
if
(
innerBatchRsp
.
rsps
==
NULL
)
{
//TODO
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
SMqHbTopicInfo
*
topicInfo
=
taosArrayGet
(
mqHb
.
pTopics
,
i
);
SMqConsumerTopic
*
pConsumerTopic
=
taosHashGet
(
pConsumer
->
topicHash
,
topicInfo
->
name
,
strlen
(
topicInfo
->
name
)
+
1
);
if
(
pConsumerTopic
->
epoch
!=
topicInfo
->
epoch
)
{
//add new vgids into rsp
int
vgSz
=
taosArrayGetSize
(
topicInfo
->
pVgInfo
);
for
(
int
j
=
0
;
j
<
vgSz
;
j
++
)
{
SMqHbRsp
innerRsp
;
SMqHbVgInfo
*
pVgInfo
=
taosArrayGet
(
topicInfo
->
pVgInfo
,
i
);
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
pVgInfo
->
vgId
);
innerRsp
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
taosArrayPush
(
innerBatchRsp
.
rsps
,
&
innerRsp
);
}
}
taosArrayPush
(
batchRsp
.
batchRsps
,
&
innerBatchRsp
);
}
int32_t
tlen
=
taosEncodeSMqHbBatchRsp
(
NULL
,
&
batchRsp
);
void
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
//TODO
return
NULL
;
}
void
*
abuf
=
buf
;
taosEncodeSMqHbBatchRsp
(
&
abuf
,
&
batchRsp
);
pRsp
->
body
=
buf
;
pRsp
->
bodyLen
=
tlen
;
return
pRsp
;
}
static
int32_t
mndProcessHeartBeatReq
(
SMnodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pMnode
;
char
*
batchReqStr
=
pReq
->
rpcMsg
.
pCont
;
SMnode
*
pMnode
=
pReq
->
pMnode
;
char
*
batchReqStr
=
pReq
->
rpcMsg
.
pCont
;
SClientHbBatchReq
batchReq
=
{
0
};
tDeserializeSClientHbBatchReq
(
batchReqStr
,
&
batchReq
);
SArray
*
pArray
=
batchReq
.
reqs
;
int
sz
=
taosArrayGetSize
(
pArray
);
int
sz
=
taosArrayGetSize
(
pArray
);
SClientHbBatchRsp
batchRsp
=
{
0
};
batchRsp
.
rsps
=
taosArrayInit
(
0
,
sizeof
(
SClientHbRsp
));
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SClientHbReq
*
pHbReq
=
taosArrayGet
(
pArray
,
i
);
SClientHbReq
*
pHbReq
=
taosArrayGet
(
pArray
,
i
);
if
(
pHbReq
->
connKey
.
hbType
==
HEARTBEAT_TYPE_QUERY
)
{
}
else
if
(
pHbReq
->
connKey
.
hbType
==
HEARTBEAT_TYPE_MQ
)
{
SClientHbRsp
rsp
=
{.
status
=
0
,
.
connKey
=
pHbReq
->
connKey
,
.
bodyLen
=
0
,
.
body
=
NULL
};
taosArrayPush
(
batchRsp
.
rsps
,
&
rsp
);
SClientHbRsp
*
pRsp
=
mndMqHbBuildRsp
(
pMnode
,
pHbReq
);
if
(
pRsp
!=
NULL
)
{
taosArrayPush
(
batchRsp
.
rsps
,
pRsp
);
free
(
pRsp
);
}
}
}
int32_t
tlen
=
tSerializeSClientHbBatchRsp
(
NULL
,
&
batchRsp
);
void
*
buf
=
rpcMallocCont
(
tlen
);
void
*
bufCopy
=
buf
;
tSerializeSClientHbBatchRsp
(
&
bufCopy
,
&
batchRsp
);
void
*
buf
=
rpcMallocCont
(
tlen
);
void
*
abuf
=
buf
;
tSerializeSClientHbBatchRsp
(
&
abuf
,
&
batchRsp
);
pReq
->
contLen
=
tlen
;
pReq
->
pCont
=
buf
;
return
0
;
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
6be271c8
...
...
@@ -74,6 +74,15 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
version
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
sqlLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
,
TOPIC_ENCODE_OVER
);
int32_t
logicalPlanLen
=
strlen
(
pTopic
->
logicalPlan
)
+
1
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
strlen
(
pTopic
->
logicalPlan
)
+
1
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
logicalPlan
,
logicalPlanLen
,
TOPIC_ENCODE_OVER
);
int32_t
physicalPlanLen
=
strlen
(
pTopic
->
physicalPlan
)
+
1
;
pTopic
->
physicalPlan
=
calloc
(
physicalPlanLen
,
sizeof
(
char
));
if
(
pTopic
->
physicalPlan
==
NULL
)
goto
TOPIC_ENCODE_OVER
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
strlen
(
pTopic
->
physicalPlan
)
+
1
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
physicalPlanLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_ENCODE_OVER
);
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
TOPIC_ENCODE_OVER
);
...
...
@@ -83,6 +92,12 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
TOPIC_ENCODE_OVER:
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"topic:%s, failed to encode to raw:%p since %s"
,
pTopic
->
name
,
pRaw
,
terrstr
());
/*if (pTopic->logicalPlan) {*/
/*free(pTopic->logicalPlan);*/
/*}*/
/*if (pTopic->physicalPlan) {*/
/*free(pTopic->physicalPlan);*/
/*}*/
sdbFreeRaw
(
pRaw
);
return
NULL
;
}
...
...
@@ -121,10 +136,23 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
pTopic
->
sql
=
calloc
(
pTopic
->
sqlLen
+
1
,
sizeof
(
char
));
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
,
TOPIC_DECODE_OVER
);
// SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
// SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER);
// SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
// SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
TOPIC_DECODE_OVER
);
pTopic
->
logicalPlan
=
calloc
(
len
+
1
,
sizeof
(
char
));
if
(
pTopic
->
logicalPlan
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TOPIC_DECODE_OVER
;
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
logicalPlan
,
len
+
1
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
TOPIC_DECODE_OVER
);
pTopic
->
logicalPlan
=
calloc
(
len
+
1
,
sizeof
(
char
));
if
(
pTopic
->
physicalPlan
==
NULL
)
{
free
(
pTopic
->
logicalPlan
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TOPIC_DECODE_OVER
;
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
len
+
1
,
TOPIC_DECODE_OVER
);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_DECODE_OVER
)
...
...
source/dnode/mnode/impl/test/profile/profile.cpp
浏览文件 @
6be271c8
...
...
@@ -121,11 +121,11 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) {
SClientHbBatchRsp
rsp
=
{
0
};
tDeserializeSClientHbBatchRsp
(
pRspChar
,
&
rsp
);
int
sz
=
taosArrayGetSize
(
rsp
.
rsps
);
ASSERT_EQ
(
sz
,
1
);
SClientHbRsp
*
pRsp
=
(
SClientHbRsp
*
)
taosArrayGet
(
rsp
.
rsps
,
0
);
EXPECT_EQ
(
pRsp
->
connKey
.
connId
,
123
);
EXPECT_EQ
(
pRsp
->
connKey
.
hbType
,
HEARTBEAT_TYPE_MQ
);
EXPECT_EQ
(
pRsp
->
status
,
0
);
ASSERT_EQ
(
sz
,
0
);
//
SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0);
//
EXPECT_EQ(pRsp->connKey.connId, 123);
//
EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ);
//
EXPECT_EQ(pRsp->status, 0);
#if 0
int32_t contLen = sizeof(SHeartBeatReq);
...
...
source/dnode/vnode/inc/tq.h
浏览文件 @
6be271c8
...
...
@@ -25,6 +25,7 @@
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"
#include "meta.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -314,6 +315,26 @@ const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**);
static
int
tqQueryExecuting
(
int32_t
status
)
{
return
status
;
}
typedef
struct
STqReadHandle
{
int64_t
ver
;
SSubmitMsg
*
pMsg
;
SSubmitBlk
*
pBlock
;
SSubmitMsgIter
msgIter
;
SSubmitBlkIter
blkIter
;
SMeta
*
pMeta
;
}
STqReadHandle
;
typedef
struct
SSubmitBlkScanInfo
{
}
SSubmitBlkScanInfo
;
STqReadHandle
*
tqInitSubmitMsgScanner
(
SMeta
*
pMeta
,
SSubmitMsg
*
pMsg
);
bool
tqNextDataBlock
(
STqReadHandle
*
pHandle
);
int
tqRetrieveDataBlockInfo
(
STqReadHandle
*
pHandle
,
SDataBlockInfo
*
pBlockInfo
);
//return SArray<SColumnInfoData>
SArray
*
tqRetrieveDataBlock
(
STqReadHandle
*
pHandle
,
SArray
*
pColumnIdList
);
//int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
6be271c8
...
...
@@ -607,3 +607,70 @@ int tqItemSSize() {
// mainly for executor
return
0
;
}
STqReadHandle
*
tqInitSubmitMsgScanner
(
SMeta
*
pMeta
,
SSubmitMsg
*
pMsg
)
{
STqReadHandle
*
pReadHandle
=
malloc
(
sizeof
(
STqReadHandle
));
if
(
pReadHandle
==
NULL
)
{
return
NULL
;
}
pReadHandle
->
pMeta
=
pMeta
;
pReadHandle
->
pMsg
=
pMsg
;
tInitSubmitMsgIter
(
pMsg
,
&
pReadHandle
->
msgIter
);
pReadHandle
->
ver
=
-
1
;
return
NULL
;
}
bool
tqNextDataBlock
(
STqReadHandle
*
pHandle
)
{
if
(
tGetSubmitMsgNext
(
&
pHandle
->
msgIter
,
&
pHandle
->
pBlock
)
<
0
)
{
return
false
;
}
return
true
;
}
int
tqRetrieveDataBlockInfo
(
STqReadHandle
*
pHandle
,
SDataBlockInfo
*
pBlockInfo
)
{
SMemRow
row
;
int32_t
sversion
=
pHandle
->
pBlock
->
sversion
;
SSchemaWrapper
*
pSchema
=
metaGetTableSchema
(
pHandle
->
pMeta
,
pHandle
->
pBlock
->
uid
,
sversion
,
false
);
pBlockInfo
->
numOfCols
=
pSchema
->
nCols
;
pBlockInfo
->
rows
=
pHandle
->
pBlock
->
numOfRows
;
pBlockInfo
->
uid
=
pHandle
->
pBlock
->
uid
;
//TODO: filter out unused column
return
0
;
}
SArray
*
tqRetrieveDataBlock
(
STqReadHandle
*
pHandle
,
SArray
*
pColumnIdList
)
{
int32_t
sversion
=
pHandle
->
pBlock
->
sversion
;
SSchemaWrapper
*
pSchemaWrapper
=
metaGetTableSchema
(
pHandle
->
pMeta
,
pHandle
->
pBlock
->
uid
,
sversion
,
true
);
STSchema
*
pTschema
=
metaGetTbTSchema
(
pHandle
->
pMeta
,
pHandle
->
pBlock
->
uid
,
sversion
);
SArray
*
pArray
=
taosArrayInit
(
pSchemaWrapper
->
nCols
,
sizeof
(
SColumnInfoData
));
if
(
pArray
==
NULL
)
{
return
NULL
;
}
SColumnInfoData
colInfo
;
int
sz
=
pSchemaWrapper
->
nCols
*
pSchemaWrapper
->
pSchema
->
bytes
;
colInfo
.
pData
=
malloc
(
sz
);
if
(
colInfo
.
pData
==
NULL
)
{
return
NULL
;
}
for
(
int
i
=
0
;
i
<
pTschema
->
numOfCols
;
i
++
)
{
//TODO: filter out unused column
taosArrayPush
(
pColumnIdList
,
&
(
schemaColAt
(
pTschema
,
i
)
->
colId
));
}
SMemRow
row
;
int32_t
kvIdx
;
while
((
row
=
tGetSubmitBlkNext
(
&
pHandle
->
blkIter
))
!=
NULL
)
{
for
(
int
i
=
0
;
i
<
pTschema
->
numOfCols
&&
kvIdx
<
pTschema
->
numOfCols
;
i
++
)
{
//TODO: filter out unused column
STColumn
*
pCol
=
schemaColAt
(
pTschema
,
i
);
void
*
val
=
tdGetMemRowDataOfColEx
(
row
,
pCol
->
colId
,
pCol
->
type
,
TD_DATA_ROW_HEAD_SIZE
+
pCol
->
offset
,
&
kvIdx
);
//TODO: handle varlen
memcpy
(
POINTER_SHIFT
(
colInfo
.
pData
,
pCol
->
offset
),
val
,
pCol
->
bytes
);
}
}
taosArrayPush
(
pArray
,
&
colInfo
);
return
pArray
;
}
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status) {*/
/*return 0;*/
/*}*/
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
6be271c8
...
...
@@ -85,7 +85,6 @@ enum {
typedef
struct
STableCheckInfo
{
uint64_t
tableId
;
TSKEY
lastKey
;
STable
*
pTableObj
;
SBlockInfo
*
pCompInfo
;
int32_t
compSize
;
int32_t
numOfBlocks
:
29
;
// number of qualified data blocks not the original blocks
...
...
@@ -141,8 +140,6 @@ typedef struct STsdbReadHandle {
STableBlockInfo
*
pDataBlockInfo
;
SDataCols
*
pDataCols
;
// in order to hold current file data block
int32_t
allocSize
;
// allocated data block size
// STsdb
// STsdbMemTable * pMemTable;
SArray
*
defaultLoadColumn
;
// default load column
SDataBlockLoadInfo
dataBlockLoadInfo
;
/* record current block load information */
SLoadCompBlockInfo
compBlockLoadInfo
;
/* record current compblock information in SQueryAttr */
...
...
@@ -204,8 +201,8 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load
int16_t
colId
=
*
(
int16_t
*
)
taosArrayGet
(
pLocalIdList
,
0
);
// the primary timestamp column does not be included in the the specified load column list, add it
if
(
loadTS
&&
colId
!=
0
)
{
int16_t
columnId
=
0
;
if
(
loadTS
&&
colId
!=
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
int16_t
columnId
=
PRIMARYKEY_TIMESTAMP_COL_ID
;
taosArrayInsert
(
pLocalIdList
,
0
,
&
columnId
);
}
...
...
@@ -292,7 +289,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
for
(
int32_t
j
=
0
;
j
<
gsize
;
++
j
)
{
STableKeyInfo
*
pKeyInfo
=
(
STableKeyInfo
*
)
taosArrayGet
(
group
,
j
);
STableCheckInfo
info
=
{
.
lastKey
=
pKeyInfo
->
lastKey
,
.
pTableObj
=
pKeyInfo
->
pTable
};
STableCheckInfo
info
=
{
.
lastKey
=
pKeyInfo
->
lastKey
};
// assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
// info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
...
...
@@ -315,10 +312,9 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
size_t
gsize
=
taosArrayGetSize
(
pTableCheckInfo
);
for
(
int32_t
i
=
0
;
i
<
gsize
;
++
i
)
{
STableCheckInfo
*
pInfo
=
(
STableCheckInfo
*
)
taosArrayGet
(
pTableCheckInfo
,
i
);
taosArrayPush
(
pTable
,
&
pInfo
->
pTableObj
);
}
// for (int32_t i = 0; i < gsize; ++i) {
// STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
// }
*
psTable
=
pTable
;
return
pTableCheckInfo
;
...
...
@@ -347,15 +343,11 @@ static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
// only one table, not need to sort again
static
SArray
*
createCheckInfoFromCheckInfo
(
STableCheckInfo
*
pCheckInfo
,
TSKEY
skey
,
SArray
**
psTable
)
{
SArray
*
pNew
=
taosArrayInit
(
1
,
sizeof
(
STableCheckInfo
));
SArray
*
pTable
=
taosArrayInit
(
1
,
sizeof
(
STable
*
));
STableCheckInfo
info
=
{
.
lastKey
=
skey
,
.
pTableObj
=
pCheckInfo
->
pTableObj
};
STableCheckInfo
info
=
{
.
lastKey
=
skey
};
info
.
tableId
=
pCheckInfo
->
tableId
;
taosArrayPush
(
pNew
,
&
info
);
taosArrayPush
(
pTable
,
&
pCheckInfo
->
pTableObj
);
*
psTable
=
pTable
;
return
pNew
;
}
...
...
@@ -461,9 +453,6 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
pReadHandle
->
defaultLoadColumn
=
getDefaultLoadColumns
(
pReadHandle
,
true
);
}
// STsdbMeta* pMeta = NULL;//tsdbGetMeta(tsdb);
// assert(pMeta != NULL);
pReadHandle
->
pDataCols
=
tdNewDataCols
(
1000
,
pReadHandle
->
pTsdb
->
config
.
maxRowsPerFileBlock
);
if
(
pReadHandle
->
pDataCols
==
NULL
)
{
tsdbError
(
"%p failed to malloc buf for pDataCols, %"
PRIu64
,
pReadHandle
,
pReadHandle
->
qId
);
...
...
@@ -641,12 +630,6 @@ SArray* tsdbGetQueriedTableList(tsdbReadHandleT *pHandle) {
size_t
size
=
taosArrayGetSize
(
pTsdbReadHandle
->
pTableCheckInfo
);
SArray
*
res
=
taosArrayInit
(
size
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pTsdbReadHandle
->
pTableCheckInfo
,
i
);
taosArrayPush
(
res
,
&
pCheckInfo
->
pTableObj
);
}
return
res
;
}
...
...
@@ -1049,7 +1032,10 @@ static int32_t loadBlockInfo(STsdbReadHandle * pTsdbReadHandle, int32_t index, i
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pTsdbReadHandle
->
pTableCheckInfo
,
index
);
pCheckInfo
->
numOfBlocks
=
0
;
if
(
tsdbSetReadTable
(
&
pTsdbReadHandle
->
rhelper
,
pCheckInfo
->
pTableObj
)
!=
TSDB_CODE_SUCCESS
)
{
STable
table
=
{.
uid
=
pCheckInfo
->
tableId
,
.
tid
=
pCheckInfo
->
tableId
};
table
.
pSchema
=
metaGetTbTSchema
(
pTsdbReadHandle
->
pTsdb
->
pMeta
,
pCheckInfo
->
tableId
,
0
);
if
(
tsdbSetReadTable
(
&
pTsdbReadHandle
->
rhelper
,
&
table
)
!=
TSDB_CODE_SUCCESS
)
{
code
=
terrno
;
return
code
;
}
...
...
@@ -1149,7 +1135,7 @@ static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfB
static
int32_t
doLoadFileDataBlock
(
STsdbReadHandle
*
pTsdbReadHandle
,
SBlock
*
pBlock
,
STableCheckInfo
*
pCheckInfo
,
int32_t
slotIndex
)
{
int64_t
st
=
taosGetTimestampUs
();
STSchema
*
pSchema
=
NULL
;
//tsdbGetTableSchema(pCheckInfo->pTableObj
);
STSchema
*
pSchema
=
metaGetTbTSchema
(
pTsdbReadHandle
->
pTsdb
->
pMeta
,
pCheckInfo
->
tableId
,
0
);
int32_t
code
=
tdInitDataCols
(
pTsdbReadHandle
->
pDataCols
,
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbError
(
"%p failed to malloc buf for pDataCols, 0x%"
PRIx64
,
pTsdbReadHandle
,
pTsdbReadHandle
->
qId
);
...
...
@@ -1184,7 +1170,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
pBlockLoadInfo
->
fileGroup
=
pTsdbReadHandle
->
pFileGroup
;
pBlockLoadInfo
->
slot
=
pTsdbReadHandle
->
cur
.
slot
;
pBlockLoadInfo
->
uid
=
pCheckInfo
->
pTableObj
->
ui
d
;
pBlockLoadInfo
->
uid
=
pCheckInfo
->
tableI
d
;
SDataCols
*
pCols
=
pTsdbReadHandle
->
rhelper
.
pDCols
[
0
];
assert
(
pCols
->
numOfRows
!=
0
&&
pCols
->
numOfRows
<=
pBlock
->
numOfRows
);
...
...
@@ -1878,7 +1864,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
int32_t
step
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
?
1
:-
1
;
int32_t
numOfCols
=
(
int32_t
)(
QH_GET_NUM_OF_COLS
(
pTsdbReadHandle
));
STable
*
pTable
=
pCheckInfo
->
pTableObj
;
STable
*
pTable
=
NULL
;
int32_t
endPos
=
getEndPosInDataBlock
(
pTsdbReadHandle
,
&
blockInfo
);
tsdbDebug
(
"%p uid:%"
PRIu64
" start merge data block, file block range:%"
PRIu64
"-%"
PRIu64
" rows:%d, start:%d,"
...
...
@@ -1932,7 +1918,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
rv2
=
memRowVersion
(
row2
);
}
mergeTwoRowFromMem
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
numOfRows
,
row1
,
row2
,
numOfCols
,
p
Table
,
pSchema1
,
pSchema2
,
true
);
mergeTwoRowFromMem
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
numOfRows
,
row1
,
row2
,
numOfCols
,
p
CheckInfo
->
tableId
,
pSchema1
,
pSchema2
,
true
);
numOfRows
+=
1
;
if
(
cur
->
win
.
skey
==
TSKEY_INITIAL_VAL
)
{
cur
->
win
.
skey
=
key
;
...
...
@@ -1958,7 +1944,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
bool
forceSetNull
=
pCfg
->
update
!=
TD_ROW_PARTIAL_UPDATE
;
mergeTwoRowFromMem
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
numOfRows
,
row1
,
row2
,
numOfCols
,
p
Table
,
pSchema1
,
pSchema2
,
forceSetNull
);
mergeTwoRowFromMem
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
numOfRows
,
row1
,
row2
,
numOfCols
,
p
CheckInfo
->
tableId
,
pSchema1
,
pSchema2
,
forceSetNull
);
numOfRows
+=
1
;
if
(
cur
->
win
.
skey
==
TSKEY_INITIAL_VAL
)
{
cur
->
win
.
skey
=
key
;
...
...
@@ -2745,7 +2731,7 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
// if (ret != TSDB_CODE_SUCCESS) {
// return false;
// }
mergeTwoRowFromMem
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
0
,
pRow
,
NULL
,
numOfCols
,
pCheckInfo
->
pTableObj
,
NULL
,
NULL
,
true
);
mergeTwoRowFromMem
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
0
,
pRow
,
NULL
,
numOfCols
,
pCheckInfo
->
tableId
,
NULL
,
NULL
,
true
);
tfree
(
pRow
);
// update the last key value
...
...
@@ -3389,14 +3375,14 @@ SArray* tsdbRetrieveDataBlock(tsdbReadHandleT* pTsdbReadHandle, SArray* pIdList)
if
(
pHandle
->
cur
.
mixBlock
)
{
return
pHandle
->
pColumns
;
}
else
{
SDataBlockInfo
binfo
=
{
0
};
/*GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);*/
SDataBlockInfo
binfo
=
GET_FILE_DATA_BLOCK_INFO
(
pCheckInfo
,
pBlockInfo
->
compBlock
);
assert
(
pHandle
->
realNumOfRows
<=
binfo
.
rows
);
// data block has been loaded, todo extract method
SDataBlockLoadInfo
*
pBlockLoadInfo
=
&
pHandle
->
dataBlockLoadInfo
;
if
(
pBlockLoadInfo
->
slot
==
pHandle
->
cur
.
slot
&&
pBlockLoadInfo
->
fileGroup
->
fid
==
pHandle
->
cur
.
fid
&&
pBlockLoadInfo
->
uid
==
pCheckInfo
->
pTableObj
->
ti
d
)
{
pBlockLoadInfo
->
uid
==
pCheckInfo
->
tableI
d
)
{
return
pHandle
->
pColumns
;
}
else
{
// only load the file block
SBlock
*
pBlock
=
pBlockInfo
->
compBlock
;
...
...
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
浏览文件 @
6be271c8
...
...
@@ -551,7 +551,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
static
int
tsdbLoadBlockDataColsImpl
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SDataCols
*
pDataCols
,
int16_t
*
colIds
,
int
numOfColIds
)
{
ASSERT
(
pBlock
->
numOfSubBlocks
==
0
||
pBlock
->
numOfSubBlocks
==
1
);
ASSERT
(
colIds
[
0
]
==
0
);
ASSERT
(
colIds
[
0
]
==
PRIMARYKEY_TIMESTAMP_COL_ID
);
SDFile
*
pDFile
=
(
pBlock
->
last
)
?
TSDB_READ_LAST_FILE
(
pReadh
)
:
TSDB_READ_DATA_FILE
(
pReadh
);
SBlockCol
blockCol
=
{
0
};
...
...
@@ -588,7 +588,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
if
(
pDataCol
==
NULL
)
continue
;
ASSERT
(
pDataCol
->
colId
==
colId
);
if
(
colId
==
0
)
{
// load the key row
if
(
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
// load the key row
blockCol
.
colId
=
colId
;
blockCol
.
len
=
pBlock
->
keyLen
;
blockCol
.
type
=
pDataCol
->
type
;
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
6be271c8
...
...
@@ -28,7 +28,7 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
case
TDMT_VND_QUERY
:
return
qWorkerProcessQueryMsg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_QUERY_CONTINUE
:
return
qWorkerProcess
QueryContinue
Msg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
return
qWorkerProcess
CQuery
Msg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_SCHEDULE_DATA_SINK
:
return
qWorkerProcessDataSinkMsg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
default:
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
6be271c8
...
...
@@ -47,6 +47,11 @@ enum {
CTG_RENT_STABLE
,
};
typedef
struct
SCTGDebug
{
int32_t
lockDebug
;
}
SCTGDebug
;
typedef
struct
SVgroupListCache
{
int32_t
vgroupVersion
;
SHashObj
*
cache
;
// key:vgId, value:SVgroupInfo
...
...
@@ -134,20 +139,22 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define CTG_LOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
qDebug
("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
qDebug
("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
qDebug
("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
qDebug
("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
...
...
@@ -155,15 +162,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_UNLOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
qDebug
("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
qDebug
("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
qDebug
("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
qDebug
("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
6be271c8
...
...
@@ -20,6 +20,9 @@
SCatalogMgmt
ctgMgmt
=
{
0
};
SCTGDebug
gCTGDebug
=
{
0
};
int32_t
ctgGetDBVgroupFromCache
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
**
dbInfo
,
bool
*
inCache
)
{
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
*
inCache
=
false
;
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
6be271c8
...
...
@@ -196,7 +196,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput
->
bufStatus
=
updateStatus
(
pDispatcher
);
pthread_mutex_lock
(
&
pDispatcher
->
mutex
);
pOutput
->
queryEnd
=
pDispatcher
->
queryEnd
;
pOutput
->
needSchedule
=
false
;
pOutput
->
scheduleJobNo
=
0
;
pOutput
->
useconds
=
pDispatcher
->
useconds
;
pOutput
->
precision
=
pDispatcher
->
schema
.
precision
;
pthread_mutex_unlock
(
&
pDispatcher
->
mutex
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
6be271c8
...
...
@@ -4927,6 +4927,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SResFetchReq
*
pMsg
=
calloc
(
1
,
sizeof
(
SResFetchReq
));
if
(
NULL
==
pMsg
)
{
// todo handle malloc error
}
SEpSet
epSet
;
...
...
@@ -7381,6 +7382,7 @@ int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* r
cond
.
numOfCols
=
taosArrayGetSize
(
pTableScanNode
->
scan
.
node
.
pTargets
);
cond
.
colList
=
calloc
(
cond
.
numOfCols
,
sizeof
(
SColumnInfo
));
cond
.
twindow
=
pTableScanNode
->
window
;
cond
.
type
=
BLOCK_LOAD_OFFSET_SEQ_ORDER
;
for
(
int32_t
i
=
0
;
i
<
cond
.
numOfCols
;
++
i
)
{
SExprInfo
*
pExprInfo
=
taosArrayGetP
(
pTableScanNode
->
scan
.
node
.
pTargets
,
i
);
...
...
source/libs/qworker/inc/qworkerInt.h
浏览文件 @
6be271c8
...
...
@@ -27,14 +27,30 @@ extern "C" {
#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000
enum
{
QW_READY_NOT_RECEIVED
=
0
,
QW_READY_RECEIVED
,
QW_READY_RESPONSED
,
QW_PHASE_PRE_QUERY
=
1
,
QW_PHASE_POST_QUERY
,
QW_PHASE_PRE_CQUERY
,
QW_PHASE_POST_CQUERY
,
QW_PHASE_PRE_SINK
,
QW_PHASE_POST_SINK
,
QW_PHASE_PRE_FETCH
,
QW_PHASE_POST_FETCH
,
};
enum
{
QW_TASK_INFO_STATUS
=
1
,
QW_TASK_INFO_READY
,
QW_EVENT_CANCEL
=
1
,
QW_EVENT_READY
,
QW_EVENT_FETCH
,
QW_EVENT_DROP
,
QW_EVENT_CQUERY
,
QW_EVENT_MAX
,
};
enum
{
QW_EVENT_NOT_RECEIVED
=
0
,
QW_EVENT_RECEIVED
,
QW_EVENT_PROCESSED
,
};
enum
{
...
...
@@ -57,21 +73,45 @@ enum {
QW_ADD_ACQUIRE
,
};
typedef
struct
SQWDebug
{
int32_t
lockDebug
;
}
SQWDebug
;
typedef
struct
SQWMsg
{
void
*
node
;
char
*
msg
;
int32_t
msgLen
;
void
*
connection
;
}
SQWMsg
;
typedef
struct
SQWPhaseInput
{
int8_t
status
;
int32_t
code
;
qTaskInfo_t
taskHandle
;
DataSinkHandle
sinkHandle
;
}
SQWPhaseInput
;
typedef
struct
SQWPhaseOutput
{
int32_t
rspCode
;
bool
needStop
;
bool
needRsp
;
}
SQWPhaseOutput
;
typedef
struct
SQWTaskStatus
{
SRWLatch
lock
;
int32_t
code
;
int8_t
status
;
int8_t
ready
;
bool
cancel
;
bool
drop
;
}
SQWTaskStatus
;
typedef
struct
SQWTaskCtx
{
SRWLatch
lock
;
int8_t
sinkScheduled
;
int8_t
queryScheduled
;
int32_t
phase
;
int32_t
sinkId
;
int32_t
readyCode
;
int8_t
events
[
QW_EVENT_MAX
];
bool
needRsp
;
qTaskInfo_t
taskHandle
;
DataSinkHandle
sinkHandle
;
}
SQWTaskCtx
;
...
...
@@ -95,15 +135,22 @@ typedef struct SQWorkerMgmt {
putReqToQueryQFp
putToQueueFp
;
}
SQWorkerMgmt
;
#define QW_GOT_RES_DATA(data) (true)
#define QW_LOW_RES_DATA(data) (false)
#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId
#define QW_IDS() sId, qId, tId
#define QW_FPARAMS() mgmt, QW_IDS()
#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
#define QW_IN_EXECUTOR(ctx) ((ctx)->phase == QW_PHASE_PRE_QUERY || (ctx)->phase == QW_PHASE_PRE_CQUERY || (ctx)->phase == QW_PHASE_PRE_FETCH || (ctx)->phase == QW_PHASE_PRE_SINK)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qId, tId) do { *(uint64_t *)(id) = (qId); *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); } while (0)
#define QW_GET_QTID(id, qId, tId) do { (qId) = *(uint64_t *)(id); (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); } while (0)
#define QW_IDS() sId, qId, tId
#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
...
...
@@ -123,21 +170,22 @@ typedef struct SQWorkerMgmt {
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define QW_LOCK(type, _lock) do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
qDebug
("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
qDebug
("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
qDebug
("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
qDebug
("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
...
...
@@ -145,25 +193,21 @@ typedef struct SQWorkerMgmt {
#define QW_UNLOCK(type, _lock) do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
qDebug
("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
qDebug
("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
qDebug
("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
qDebug
("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
int32_t
qwAcquireScheduler
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SQWSchStatus
**
sch
);
int32_t
qwAcquireAddScheduler
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SQWSchStatus
**
sch
);
int32_t
qwAcquireTask
(
SQWorkerMgmt
*
mgmt
,
int32_t
rwType
,
SQWSchStatus
*
sch
,
uint64_t
qId
,
uint64_t
tId
,
SQWTaskStatus
**
task
);
#ifdef __cplusplus
}
...
...
source/libs/qworker/inc/qworkerMsg.h
0 → 100644
浏览文件 @
6be271c8
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_QWORKER_MSG_H_
#define _TD_QWORKER_MSG_H_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "qworkerInt.h"
#include "dataSinkMgt.h"
int32_t
qwProcessQuery
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessCQuery
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessReady
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessFetch
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessDrop
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
);
int32_t
qwBuildAndSendDropRsp
(
void
*
connection
,
int32_t
code
);
int32_t
qwBuildAndSendFetchRsp
(
void
*
connection
,
SRetrieveTableRsp
*
pRsp
,
int32_t
dataLength
,
int32_t
code
);
void
qwBuildFetchRsp
(
void
*
msg
,
SOutputData
*
input
,
int32_t
len
);
int32_t
qwBuildAndSendCQueryMsg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
void
*
connection
);
int32_t
qwBuildAndSendSchSinkMsg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
void
*
connection
);
int32_t
qwBuildAndSendReadyRsp
(
void
*
connection
,
int32_t
code
);
int32_t
qwBuildAndSendQueryRsp
(
void
*
connection
,
int32_t
code
);
void
qwFreeFetchRsp
(
void
*
msg
);
int32_t
qwMallocFetchRsp
(
int32_t
length
,
SRetrieveTableRsp
**
rsp
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_QWORKER_INT_H_*/
source/libs/qworker/src/qworker.c
浏览文件 @
6be271c8
此差异已折叠。
点击以展开。
source/libs/qworker/src/qworkerMsg.c
0 → 100644
浏览文件 @
6be271c8
#include "qworker.h"
#include <common.h>
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qworkerInt.h"
#include "qworkerMsg.h"
#include "tmsg.h"
#include "tname.h"
#include "dataSinkMgt.h"
int32_t
qwMallocFetchRsp
(
int32_t
length
,
SRetrieveTableRsp
**
rsp
)
{
int32_t
msgSize
=
sizeof
(
SRetrieveTableRsp
)
+
length
;
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
msgSize
);
if
(
NULL
==
pRsp
)
{
qError
(
"rpcMallocCont %d failed"
,
msgSize
);
QW_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
memset
(
pRsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
*
rsp
=
pRsp
;
return
TSDB_CODE_SUCCESS
;
}
void
qwBuildFetchRsp
(
void
*
msg
,
SOutputData
*
input
,
int32_t
len
)
{
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
msg
;
rsp
->
useconds
=
htobe64
(
input
->
useconds
);
rsp
->
completed
=
input
->
queryEnd
;
rsp
->
precision
=
input
->
precision
;
rsp
->
compressed
=
input
->
compressed
;
rsp
->
compLen
=
htonl
(
len
);
rsp
->
numOfRows
=
htonl
(
input
->
numOfRows
);
}
void
qwFreeFetchRsp
(
void
*
msg
)
{
if
(
msg
)
{
rpcFreeCont
(
msg
);
}
}
int32_t
qwBuildAndSendQueryRsp
(
void
*
connection
,
int32_t
code
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendReadyRsp
(
void
*
connection
,
int32_t
code
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SResReadyRsp
*
pRsp
=
(
SResReadyRsp
*
)
rpcMallocCont
(
sizeof
(
SResReadyRsp
));
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendStatusRsp
(
SRpcMsg
*
pMsg
,
SSchedulerStatusRsp
*
sStatus
)
{
int32_t
size
=
0
;
if
(
sStatus
)
{
size
=
sizeof
(
SSchedulerStatusRsp
)
+
sizeof
(
sStatus
->
status
[
0
])
*
sStatus
->
num
;
}
else
{
size
=
sizeof
(
SSchedulerStatusRsp
);
}
SSchedulerStatusRsp
*
pRsp
=
(
SSchedulerStatusRsp
*
)
rpcMallocCont
(
size
);
if
(
sStatus
)
{
memcpy
(
pRsp
,
sStatus
,
size
);
}
else
{
pRsp
->
num
=
0
;
}
SRpcMsg
rpcRsp
=
{
.
msgType
=
pMsg
->
msgType
+
1
,
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
size
,
.
code
=
0
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendFetchRsp
(
void
*
connection
,
SRetrieveTableRsp
*
pRsp
,
int32_t
dataLength
,
int32_t
code
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
if
(
NULL
==
pRsp
)
{
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
memset
(
pRsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
dataLength
=
0
;
}
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
)
+
dataLength
,
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendCancelRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
STaskCancelRsp
*
pRsp
=
(
STaskCancelRsp
*
)
rpcMallocCont
(
sizeof
(
STaskCancelRsp
));
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendDropRsp
(
void
*
connection
,
int32_t
code
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
STaskDropRsp
*
pRsp
=
(
STaskDropRsp
*
)
rpcMallocCont
(
sizeof
(
STaskDropRsp
));
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendShowRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
int32_t
numOfCols
=
6
;
int32_t
msgSize
=
sizeof
(
SVShowTablesRsp
)
+
sizeof
(
SSchema
)
*
numOfCols
;
SVShowTablesRsp
*
pRsp
=
(
SVShowTablesRsp
*
)
rpcMallocCont
(
msgSize
);
int32_t
cols
=
0
;
SSchema
*
pSchema
=
pRsp
->
metaInfo
.
pSchema
;
const
SSchema
*
s
=
tGetTbnameColumnSchema
();
*
pSchema
=
createSchema
(
s
->
type
,
htonl
(
s
->
bytes
),
htonl
(
++
cols
),
"name"
);
pSchema
++
;
int32_t
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
*
pSchema
=
createSchema
(
type
,
htonl
(
tDataTypes
[
type
].
bytes
),
htonl
(
++
cols
),
"created"
);
pSchema
++
;
type
=
TSDB_DATA_TYPE_SMALLINT
;
*
pSchema
=
createSchema
(
type
,
htonl
(
tDataTypes
[
type
].
bytes
),
htonl
(
++
cols
),
"columns"
);
pSchema
++
;
*
pSchema
=
createSchema
(
s
->
type
,
htonl
(
s
->
bytes
),
htonl
(
++
cols
),
"stable"
);
pSchema
++
;
type
=
TSDB_DATA_TYPE_BIGINT
;
*
pSchema
=
createSchema
(
type
,
htonl
(
tDataTypes
[
type
].
bytes
),
htonl
(
++
cols
),
"uid"
);
pSchema
++
;
type
=
TSDB_DATA_TYPE_INT
;
*
pSchema
=
createSchema
(
type
,
htonl
(
tDataTypes
[
type
].
bytes
),
htonl
(
++
cols
),
"vgId"
);
assert
(
cols
==
numOfCols
);
pRsp
->
metaInfo
.
numOfColumns
=
htonl
(
cols
);
SRpcMsg
rpcMsg
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
msgSize
,
.
code
=
code
,
};
rpcSendResponse
(
&
rpcMsg
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendShowFetchRsp
(
SRpcMsg
*
pMsg
,
SVShowTablesFetchReq
*
pFetchReq
)
{
SVShowTablesFetchRsp
*
pRsp
=
(
SVShowTablesFetchRsp
*
)
rpcMallocCont
(
sizeof
(
SVShowTablesFetchRsp
));
int32_t
handle
=
htonl
(
pFetchReq
->
id
);
pRsp
->
numOfRows
=
0
;
SRpcMsg
rpcMsg
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
0
,
};
rpcSendResponse
(
&
rpcMsg
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendSchSinkMsg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
void
*
connection
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SSinkDataReq
*
req
=
(
SSinkDataReq
*
)
rpcMallocCont
(
sizeof
(
SSinkDataReq
));
if
(
NULL
==
req
)
{
qError
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
SSinkDataReq
));
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
req
->
header
.
vgId
=
mgmt
->
nodeId
;
req
->
sId
=
sId
;
req
->
queryId
=
qId
;
req
->
taskId
=
tId
;
SRpcMsg
pNewMsg
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
msgType
=
TDMT_VND_SCHEDULE_DATA_SINK
,
.
pCont
=
req
,
.
contLen
=
sizeof
(
SSinkDataReq
),
.
code
=
0
,
};
int32_t
code
=
(
*
mgmt
->
putToQueueFp
)(
mgmt
->
nodeObj
,
&
pNewMsg
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
qError
(
"put data sink schedule msg to queue failed, code:%x"
,
code
);
rpcFreeCont
(
req
);
QW_ERR_RET
(
code
);
}
qDebug
(
"put data sink schedule msg to query queue"
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendCQueryMsg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
void
*
connection
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SQueryContinueReq
*
req
=
(
SQueryContinueReq
*
)
rpcMallocCont
(
sizeof
(
SQueryContinueReq
));
if
(
NULL
==
req
)
{
QW_SCH_TASK_ELOG
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
SQueryContinueReq
));
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
req
->
header
.
vgId
=
mgmt
->
nodeId
;
req
->
sId
=
sId
;
req
->
queryId
=
qId
;
req
->
taskId
=
tId
;
SRpcMsg
pNewMsg
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
msgType
=
TDMT_VND_QUERY_CONTINUE
,
.
pCont
=
req
,
.
contLen
=
sizeof
(
SQueryContinueReq
),
.
code
=
0
,
};
int32_t
code
=
(
*
mgmt
->
putToQueueFp
)(
mgmt
->
nodeObj
,
&
pNewMsg
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
QW_SCH_TASK_ELOG
(
"put query continue msg to queue failed, code:%x"
,
code
);
rpcFreeCont
(
req
);
QW_ERR_RET
(
code
);
}
QW_SCH_TASK_DLOG
(
"put query continue msg to query queue, vgId:%d"
,
mgmt
->
nodeId
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid query msg, contLen:%d"
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
msg
->
contentLen
=
ntohl
(
msg
->
contentLen
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
msg
->
msg
,
.
msgLen
=
msg
->
contentLen
,
.
connection
=
pMsg
};
QW_SCH_TASK_DLOG
(
"processQuery start, node:%p"
,
node
);
QW_RET
(
qwProcessQuery
(
QW_FPARAMS
(),
&
qwMsg
));
QW_SCH_TASK_DLOG
(
"processQuery end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessCQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
0
;
int8_t
status
=
0
;
bool
queryDone
=
false
;
SQueryContinueReq
*
msg
=
(
SQueryContinueReq
*
)
pMsg
->
pCont
;
bool
needStop
=
false
;
SQWTaskCtx
*
handles
=
NULL
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid cquery msg, contLen:%d"
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
connection
=
pMsg
};
QW_SCH_TASK_DLOG
(
"processCQuery start, node:%p"
,
node
);
QW_ERR_RET
(
qwProcessCQuery
(
QW_FPARAMS
(),
&
qwMsg
));
QW_SCH_TASK_DLOG
(
"processCQuery end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessDataSinkMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
){
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SSinkDataReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid sink data msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
//dsScheduleProcess();
//TODO
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessReadyMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
){
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SResReadyReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid task status msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
connection
=
pMsg
};
QW_SCH_TASK_DLOG
(
"processReady start, node:%p"
,
node
);
QW_ERR_RET
(
qwProcessReady
(
qWorkerMgmt
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
,
&
qwMsg
));
QW_SCH_TASK_DLOG
(
"processReady end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessStatusMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
int32_t
code
=
0
;
SSchTasksStatusReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid task status msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
htobe64
(
msg
->
sId
);
SSchedulerStatusRsp
*
sStatus
=
NULL
;
//QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
_return:
QW_ERR_RET
(
qwBuildAndSendStatusRsp
(
pMsg
,
sStatus
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessFetchMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SResFetchReq
*
msg
=
pMsg
->
pCont
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
connection
=
pMsg
};
QW_SCH_TASK_DLOG
(
"processFetch start, node:%p"
,
node
);
QW_ERR_RET
(
qwProcessFetch
(
QW_FPARAMS
(),
&
qwMsg
));
QW_SCH_TASK_DLOG
(
"processFetch end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessCancelMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
int32_t
code
=
0
;
STaskCancelReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid task cancel msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
htobe64
(
msg
->
sId
);
msg
->
queryId
=
htobe64
(
msg
->
queryId
);
msg
->
taskId
=
htobe64
(
msg
->
taskId
);
//QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
_return:
QW_ERR_RET
(
qwBuildAndSendCancelRsp
(
pMsg
,
code
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessDropMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
int32_t
code
=
0
;
STaskDropReq
*
msg
=
pMsg
->
pCont
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid task drop msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
connection
=
pMsg
};
QW_SCH_TASK_DLOG
(
"processDrop start, node:%p"
,
node
);
QW_ERR_RET
(
qwProcessDrop
(
QW_FPARAMS
(),
&
qwMsg
));
QW_SCH_TASK_DLOG
(
"processDrop end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessShowMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
int32_t
code
=
0
;
SVShowTablesReq
*
pReq
=
pMsg
->
pCont
;
QW_ERR_RET
(
qwBuildAndSendShowRsp
(
pMsg
,
code
));
}
int32_t
qWorkerProcessShowFetchMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SVShowTablesFetchReq
*
pFetchReq
=
pMsg
->
pCont
;
QW_ERR_RET
(
qwBuildAndSendShowFetchRsp
(
pMsg
,
pFetchReq
));
}
source/libs/scheduler/src/scheduler.c
浏览文件 @
6be271c8
...
...
@@ -1374,6 +1374,83 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
return
TSDB_CODE_SUCCESS
;
}
int32_t
schedulerConvertDagToTaskList
(
SQueryDag
*
pDag
,
SArray
**
pTasks
)
{
if
(
NULL
==
pDag
||
pDag
->
numOfSubplans
<=
0
||
taosArrayGetSize
(
pDag
->
pSubplans
)
<=
0
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
levelNum
=
taosArrayGetSize
(
pDag
->
pSubplans
);
if
(
1
!=
levelNum
)
{
qError
(
"invalid level num: %d"
,
levelNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SArray
*
plans
=
taosArrayGet
(
pDag
->
pSubplans
,
0
);
int32_t
taskNum
=
taosArrayGetSize
(
plans
);
if
(
taskNum
<=
0
)
{
qError
(
"invalid task num: %d"
,
taskNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SArray
*
info
=
taosArrayInit
(
taskNum
,
sizeof
(
STaskInfo
));
if
(
NULL
==
info
)
{
qError
(
"taosArrayInit %d taskInfo failed"
,
taskNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
STaskInfo
tInfo
=
{
0
};
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
code
=
0
;
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
SSubplan
*
plan
=
taosArrayGetP
(
plans
,
i
);
tInfo
.
addr
=
plan
->
execNode
;
code
=
qSubPlanToString
(
plan
,
&
msg
,
&
msgLen
);
if
(
TSDB_CODE_SUCCESS
!=
code
||
NULL
==
msg
||
msgLen
<=
0
)
{
qError
(
"subplanToString error, code:%x, msg:%p, len:%d"
,
code
,
msg
,
msgLen
);
SCH_ERR_JRET
(
code
);
}
int32_t
msgSize
=
sizeof
(
SSubQueryMsg
)
+
msgLen
;
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SSubQueryMsg
*
pMsg
=
msg
;
pMsg
->
header
.
vgId
=
htonl
(
tInfo
.
addr
.
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
plan
->
id
.
queryId
);
pMsg
->
taskId
=
htobe64
(
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
));
pMsg
->
contentLen
=
htonl
(
msgLen
);
memcpy
(
pMsg
->
msg
,
msg
,
msgLen
);
tInfo
.
msg
=
pMsg
;
if
(
NULL
==
taosArrayPush
(
info
,
&
tInfo
))
{
qError
(
"taosArrayPush failed, idx:%d"
,
i
);
free
(
msg
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
*
pTasks
=
info
;
info
=
NULL
;
_return:
schedulerFreeTaskList
(
info
);
SCH_RET
(
code
);
}
int32_t
scheduleFetchRows
(
SSchJob
*
pJob
,
void
**
pData
)
{
if
(
NULL
==
pJob
||
NULL
==
pData
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
...
...
@@ -1521,6 +1598,20 @@ void scheduleFreeJob(void *job) {
qDebug
(
"QID:%"
PRIx64
" job freed"
,
queryId
);
}
void
schedulerFreeTaskList
(
SArray
*
taskList
)
{
if
(
NULL
==
taskList
)
{
return
;
}
int32_t
taskNum
=
taosArrayGetSize
(
taskList
);
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
STaskInfo
*
info
=
taosArrayGet
(
taskList
,
i
);
tfree
(
info
->
msg
);
}
taosArrayDestroy
(
taskList
);
}
void
schedulerDestroy
(
void
)
{
if
(
schMgmt
.
jobs
)
{
...
...
source/libs/transport/inc/rpcLog.h
浏览文件 @
6be271c8
...
...
@@ -24,13 +24,49 @@ extern "C" {
extern
int32_t
rpcDebugFlag
;
#define tFatal(...) { if (rpcDebugFlag & DEBUG_FATAL) { taosPrintLog("RPC FATAL ", rpcDebugFlag, __VA_ARGS__); }}
#define tError(...) { if (rpcDebugFlag & DEBUG_ERROR) { taosPrintLog("RPC ERROR ", rpcDebugFlag, __VA_ARGS__); }}
#define tWarn(...) { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", rpcDebugFlag, __VA_ARGS__); }}
#define tInfo(...) { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }}
#define tDebug(...) { if (rpcDebugFlag & DEBUG_DEBUG) { taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); }}
#define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); }}
#define tDump(x, y) { if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); }}
// rpcDebugFlag = 143
#define tFatal(...) \
{ \
if (rpcDebugFlag & DEBUG_FATAL) { \
taosPrintLog("RPC FATAL ", rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tError(...) \
{ \
if (rpcDebugFlag & DEBUG_ERROR) { \
taosPrintLog("RPC ERROR ", rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tWarn(...) \
{ \
if (rpcDebugFlag & DEBUG_WARN) { \
taosPrintLog("RPC WARN ", rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tInfo(...) \
{ \
if (rpcDebugFlag & DEBUG_INFO) { \
taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tDebug(...) \
{ \
if (rpcDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tTrace(...) \
{ \
if (rpcDebugFlag & DEBUG_TRACE) { \
taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tDump(x, y) \
{ \
if (rpcDebugFlag & DEBUG_DUMP) { \
taosDumpData((unsigned char *)x, y); \
} \
}
#ifdef __cplusplus
}
...
...
source/libs/transport/src/transport.c
浏览文件 @
6be271c8
...
...
@@ -101,6 +101,13 @@ typedef struct SThreadObj {
void
*
shandle
;
}
SThreadObj
;
typedef
struct
SClientObj
{
char
label
[
TSDB_LABEL_LEN
];
int32_t
index
;
int
numOfThreads
;
SThreadObj
**
pThreadObj
;
}
SClientObj
;
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
...
...
@@ -113,7 +120,7 @@ typedef struct SServerObj {
uv_tcp_t
server
;
uv_loop_t
*
loop
;
int
workerIdx
;
int
numOfThread
;
int
numOfThread
s
;
SThreadObj
**
pThreadObj
;
uv_pipe_t
**
pipe
;
uint32_t
ip
;
...
...
@@ -179,18 +186,37 @@ static void* acceptThread(void* arg);
void
*
taosInitClient
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
void
*
taosInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
void
*
(
*
taosHandle
[])(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
=
{
taosInitServer
,
taosInitClient
};
void
*
taosInitClient
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SClientObj
*
cli
=
calloc
(
1
,
sizeof
(
SClientObj
));
memcpy
(
cli
->
label
,
label
,
strlen
(
label
));
cli
->
numOfThreads
=
numOfThreads
;
cli
->
pThreadObj
=
(
SThreadObj
**
)
calloc
(
cli
->
numOfThreads
,
sizeof
(
SThreadObj
*
));
for
(
int
i
=
0
;
i
<
cli
->
numOfThreads
;
i
++
)
{
SThreadObj
*
thrd
=
(
SThreadObj
*
)
calloc
(
1
,
sizeof
(
SThreadObj
));
int
err
=
pthread_create
(
&
thrd
->
thread
,
NULL
,
workerThread
,
(
void
*
)(
thrd
));
if
(
err
==
0
)
{
tDebug
(
"sucess to create tranport-client thread %d"
,
i
);
}
}
return
cli
;
}
void
*
taosInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SServerObj
*
srv
=
calloc
(
1
,
sizeof
(
SServerObj
));
srv
->
loop
=
(
uv_loop_t
*
)
malloc
(
sizeof
(
uv_loop_t
));
srv
->
numOfThread
=
numOfThreads
;
srv
->
numOfThread
s
=
numOfThreads
;
srv
->
workerIdx
=
0
;
srv
->
pThreadObj
=
(
SThreadObj
**
)
calloc
(
srv
->
numOfThread
,
sizeof
(
SThreadObj
*
));
srv
->
pipe
=
(
uv_pipe_t
**
)
calloc
(
srv
->
numOfThread
,
sizeof
(
uv_pipe_t
*
));
srv
->
pThreadObj
=
(
SThreadObj
**
)
calloc
(
srv
->
numOfThread
s
,
sizeof
(
SThreadObj
*
));
srv
->
pipe
=
(
uv_pipe_t
**
)
calloc
(
srv
->
numOfThread
s
,
sizeof
(
uv_pipe_t
*
));
srv
->
ip
=
ip
;
srv
->
port
=
port
;
uv_loop_init
(
srv
->
loop
);
for
(
int
i
=
0
;
i
<
srv
->
numOfThread
;
i
++
)
{
for
(
int
i
=
0
;
i
<
srv
->
numOfThread
s
;
i
++
)
{
SThreadObj
*
thrd
=
(
SThreadObj
*
)
calloc
(
1
,
sizeof
(
SThreadObj
));
srv
->
pipe
[
i
]
=
(
uv_pipe_t
*
)
calloc
(
2
,
sizeof
(
uv_pipe_t
));
int
fds
[
2
];
...
...
@@ -299,8 +325,7 @@ static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) {
if
(
!
rpcIsReq
(
pHead
->
msgType
))
{
// for response, if code is auth failure, it shall bypass the auth process
code
=
htonl
(
pHead
->
code
);
if
(
code
==
TSDB_CODE_RPC_INVALID_TIME_STAMP
||
code
==
TSDB_CODE_RPC_AUTH_FAILURE
||
code
==
TSDB_CODE_RPC_INVALID_VERSION
||
code
==
TSDB_CODE_RPC_AUTH_REQUIRED
||
if
(
code
==
TSDB_CODE_RPC_INVALID_TIME_STAMP
||
code
==
TSDB_CODE_RPC_AUTH_FAILURE
||
code
==
TSDB_CODE_RPC_INVALID_VERSION
||
code
==
TSDB_CODE_RPC_AUTH_REQUIRED
||
code
==
TSDB_CODE_MND_USER_NOT_EXIST
||
code
==
TSDB_CODE_RPC_NOT_READY
)
{
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
...
...
@@ -460,7 +485,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_buf_t
buf
=
uv_buf_init
((
char
*
)
notify
,
strlen
(
notify
));
pObj
->
workerIdx
=
(
pObj
->
workerIdx
+
1
)
%
pObj
->
numOfThread
;
pObj
->
workerIdx
=
(
pObj
->
workerIdx
+
1
)
%
pObj
->
numOfThread
s
;
tDebug
(
"new conntion accepted by main server, dispatch to %dth worker-thread"
,
pObj
->
workerIdx
);
uv_write2
(
wr
,
(
uv_stream_t
*
)
&
(
pObj
->
pipe
[
pObj
->
workerIdx
][
0
]),
&
buf
,
1
,
(
uv_stream_t
*
)
cli
,
uvOnWriteCb
);
}
else
{
...
...
@@ -587,7 +612,9 @@ void* rpcOpen(const SRpcInit* pInit) {
tstrncpy
(
pRpc
->
label
,
pInit
->
label
,
strlen
(
pInit
->
label
));
}
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
pRpc
->
tcphandle
=
taosInitServer
(
0
,
pInit
->
localPort
,
pRpc
->
label
,
pRpc
->
numOfThreads
,
NULL
,
pRpc
);
pRpc
->
connType
=
pInit
->
connType
;
pRpc
->
tcphandle
=
(
*
taosHandle
[
pRpc
->
connType
])(
0
,
pInit
->
localPort
,
pRpc
->
label
,
pRpc
->
numOfThreads
,
NULL
,
pRpc
);
// pRpc->taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
return
pRpc
;
}
void
rpcClose
(
void
*
arg
)
{
return
;
}
...
...
source/libs/transport/test/CMakeLists.txt
浏览文件 @
6be271c8
add_executable
(
transportTest
""
)
add_executable
(
client
""
)
add_executable
(
server
""
)
target_sources
(
transportTest
PRIVATE
"transportTests.cc"
)
target_sources
(
client
PRIVATE
"rclient.c"
)
target_sources
(
server
PRIVATE
"rserver.c"
)
target_include_directories
(
transportTest
PUBLIC
...
...
@@ -18,4 +29,30 @@ target_link_libraries (transportTest
transport
)
target_include_directories
(
client
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/transport"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
client
os
util
common
gtest_main
transport
)
target_include_directories
(
server
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/transport"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
server
os
util
common
gtest_main
transport
)
source/libs/transport/test/rclient.c
0 → 100644
浏览文件 @
6be271c8
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "rpcLog.h"
#include "taoserror.h"
#include "tglobal.h"
#include "trpc.h"
#include "tutil.h"
typedef
struct
{
int
index
;
SEpSet
epSet
;
int
num
;
int
numOfReqs
;
int
msgSize
;
tsem_t
rspSem
;
tsem_t
*
pOverSem
;
pthread_t
thread
;
void
*
pRpc
;
}
SInfo
;
static
void
processResponse
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
pMsg
->
ahandle
;
tDebug
(
"thread:%d, response is received, type:%d contLen:%d code:0x%x"
,
pInfo
->
index
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pMsg
->
code
);
if
(
pEpSet
)
pInfo
->
epSet
=
*
pEpSet
;
rpcFreeCont
(
pMsg
->
pCont
);
tsem_post
(
&
pInfo
->
rspSem
);
}
static
int
tcount
=
0
;
static
void
*
sendRequest
(
void
*
param
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
param
;
SRpcMsg
rpcMsg
=
{
0
};
tDebug
(
"thread:%d, start to send request"
,
pInfo
->
index
);
while
(
pInfo
->
numOfReqs
==
0
||
pInfo
->
num
<
pInfo
->
numOfReqs
)
{
pInfo
->
num
++
;
rpcMsg
.
pCont
=
rpcMallocCont
(
pInfo
->
msgSize
);
rpcMsg
.
contLen
=
pInfo
->
msgSize
;
rpcMsg
.
ahandle
=
pInfo
;
rpcMsg
.
msgType
=
1
;
tDebug
(
"thread:%d, send request, contLen:%d num:%d"
,
pInfo
->
index
,
pInfo
->
msgSize
,
pInfo
->
num
);
rpcSendRequest
(
pInfo
->
pRpc
,
&
pInfo
->
epSet
,
&
rpcMsg
,
NULL
);
if
(
pInfo
->
num
%
20000
==
0
)
tInfo
(
"thread:%d, %d requests have been sent"
,
pInfo
->
index
,
pInfo
->
num
);
tsem_wait
(
&
pInfo
->
rspSem
);
}
tDebug
(
"thread:%d, it is over"
,
pInfo
->
index
);
tcount
++
;
return
NULL
;
}
int
main
(
int
argc
,
char
*
argv
[])
{
SRpcInit
rpcInit
;
SEpSet
epSet
;
int
msgSize
=
128
;
int
numOfReqs
=
0
;
int
appThreads
=
1
;
char
serverIp
[
40
]
=
"127.0.0.1"
;
char
secret
[
20
]
=
"mypassword"
;
struct
timeval
systemTime
;
int64_t
startTime
,
endTime
;
pthread_attr_t
thattr
;
// server info
epSet
.
numOfEps
=
1
;
epSet
.
inUse
=
0
;
epSet
.
port
[
0
]
=
7000
;
epSet
.
port
[
1
]
=
7000
;
strcpy
(
epSet
.
fqdn
[
0
],
serverIp
);
strcpy
(
epSet
.
fqdn
[
1
],
"192.168.0.1"
);
// client info
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"APP"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
processResponse
;
rpcInit
.
sessions
=
100
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
user
=
"michael"
;
rpcInit
.
secret
=
secret
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
epSet
.
port
[
0
]
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-i"
)
==
0
&&
i
<
argc
-
1
)
{
tstrncpy
(
epSet
.
fqdn
[
0
],
argv
[
++
i
],
sizeof
(
epSet
.
fqdn
[
0
]));
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
numOfThreads
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-m"
)
==
0
&&
i
<
argc
-
1
)
{
msgSize
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-s"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
sessions
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-n"
)
==
0
&&
i
<
argc
-
1
)
{
numOfReqs
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-a"
)
==
0
&&
i
<
argc
-
1
)
{
appThreads
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-o"
)
==
0
&&
i
<
argc
-
1
)
{
tsCompressMsgSize
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-u"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
user
=
argv
[
++
i
];
}
else
if
(
strcmp
(
argv
[
i
],
"-k"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
secret
=
argv
[
++
i
];
}
else
if
(
strcmp
(
argv
[
i
],
"-spi"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
spi
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
rpcDebugFlag
=
atoi
(
argv
[
++
i
]);
}
else
{
printf
(
"
\n
usage: %s [options]
\n
"
,
argv
[
0
]);
printf
(
" [-i ip]: first server IP address, default is:%s
\n
"
,
serverIp
);
printf
(
" [-p port]: server port number, default is:%d
\n
"
,
epSet
.
port
[
0
]);
printf
(
" [-t threads]: number of rpc threads, default is:%d
\n
"
,
rpcInit
.
numOfThreads
);
printf
(
" [-s sessions]: number of rpc sessions, default is:%d
\n
"
,
rpcInit
.
sessions
);
printf
(
" [-m msgSize]: message body size, default is:%d
\n
"
,
msgSize
);
printf
(
" [-a threads]: number of app threads, default is:%d
\n
"
,
appThreads
);
printf
(
" [-n requests]: number of requests per thread, default is:%d
\n
"
,
numOfReqs
);
printf
(
" [-o compSize]: compression message size, default is:%d
\n
"
,
tsCompressMsgSize
);
printf
(
" [-u user]: user name for the connection, default is:%s
\n
"
,
rpcInit
.
user
);
printf
(
" [-k secret]: password for the connection, default is:%s
\n
"
,
rpcInit
.
secret
);
printf
(
" [-spi SPI]: security parameter index, default is:%d
\n
"
,
rpcInit
.
spi
);
printf
(
" [-d debugFlag]: debug flag, default:%d
\n
"
,
rpcDebugFlag
);
printf
(
" [-h help]: print out this help
\n\n
"
);
exit
(
0
);
}
}
taosInitLog
(
"client.log"
,
100000
,
10
);
void
*
pRpc
=
rpcOpen
(
&
rpcInit
);
if
(
pRpc
==
NULL
)
{
tError
(
"failed to initialize RPC"
);
return
-
1
;
}
tInfo
(
"client is initialized"
);
tInfo
(
"threads:%d msgSize:%d requests:%d"
,
appThreads
,
msgSize
,
numOfReqs
);
// gettimeofday(&systemTime, NULL);
// startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
SInfo
*
pInfo
=
(
SInfo
*
)
calloc
(
1
,
sizeof
(
SInfo
)
*
appThreads
);
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
for
(
int
i
=
0
;
i
<
appThreads
;
++
i
)
{
pInfo
->
index
=
i
;
pInfo
->
epSet
=
epSet
;
pInfo
->
numOfReqs
=
numOfReqs
;
pInfo
->
msgSize
=
msgSize
;
tsem_init
(
&
pInfo
->
rspSem
,
0
,
0
);
pInfo
->
pRpc
=
pRpc
;
pthread_create
(
&
pInfo
->
thread
,
&
thattr
,
sendRequest
,
pInfo
);
pInfo
++
;
}
do
{
usleep
(
1
);
}
while
(
tcount
<
appThreads
);
// gettimeofday(&systemTime, NULL);
// endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
// float usedTime = (endTime - startTime) / 1000.0f; // mseconds
// tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
// tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
int
ch
=
getchar
();
UNUSED
(
ch
);
taosCloseLog
();
return
0
;
}
source/libs/transport/test/rserver.c
0 → 100644
浏览文件 @
6be271c8
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include "os.h"
#include "rpcLog.h"
#include "tglobal.h"
#include "tqueue.h"
#include "trpc.h"
int
msgSize
=
128
;
int
commit
=
0
;
int
dataFd
=
-
1
;
STaosQueue
*
qhandle
=
NULL
;
STaosQset
*
qset
=
NULL
;
void
processShellMsg
()
{
static
int
num
=
0
;
STaosQall
*
qall
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
int
type
;
void
*
pvnode
;
qall
=
taosAllocateQall
();
while
(
1
)
{
int
numOfMsgs
=
taosReadAllQitemsFromQset
(
qset
,
qall
,
&
pvnode
,
NULL
);
tDebug
(
"%d shell msgs are received"
,
numOfMsgs
);
if
(
numOfMsgs
<=
0
)
break
;
for
(
int
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
qall
,
(
void
**
)
&
pRpcMsg
);
if
(
dataFd
>=
0
)
{
if
(
write
(
dataFd
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
)
<
0
)
{
tInfo
(
"failed to write data file, reason:%s"
,
strerror
(
errno
));
}
}
}
if
(
commit
>=
2
)
{
num
+=
numOfMsgs
;
// if (taosFsync(dataFd) < 0) {
// tInfo("failed to flush data to file, reason:%s", strerror(errno));
//}
if
(
num
%
10000
==
0
)
{
tInfo
(
"%d request have been written into disk"
,
num
);
}
}
taosResetQitems
(
qall
);
for
(
int
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
qall
,
(
void
**
)
&
pRpcMsg
);
rpcFreeCont
(
pRpcMsg
->
pCont
);
memset
(
&
rpcMsg
,
0
,
sizeof
(
rpcMsg
));
rpcMsg
.
pCont
=
rpcMallocCont
(
msgSize
);
rpcMsg
.
contLen
=
msgSize
;
rpcMsg
.
handle
=
pRpcMsg
->
handle
;
rpcMsg
.
code
=
0
;
rpcSendResponse
(
&
rpcMsg
);
taosFreeQitem
(
pRpcMsg
);
}
}
taosFreeQall
(
qall
);
}
int
retrieveAuthInfo
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int
ret
=
0
;
if
(
strcmp
(
meterId
,
"michael"
)
==
0
)
{
*
spi
=
1
;
*
encrypt
=
0
;
strcpy
(
secret
,
"mypassword"
);
strcpy
(
ckey
,
"key"
);
}
else
if
(
strcmp
(
meterId
,
"jeff"
)
==
0
)
{
*
spi
=
0
;
*
encrypt
=
0
;
}
else
{
ret
=
-
1
;
// user not there
}
return
ret
;
}
void
processRequestMsg
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
memcpy
(
pTemp
,
pMsg
,
sizeof
(
SRpcMsg
));
tDebug
(
"request is received, type:%d, contLen:%d, item:%p"
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pTemp
);
taosWriteQitem
(
qhandle
,
pTemp
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
SRpcInit
rpcInit
;
char
dataName
[
20
]
=
"server.data"
;
taosBlockSIGPIPE
();
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
7000
;
rpcInit
.
label
=
"SER"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
processRequestMsg
;
rpcInit
.
sessions
=
1000
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1500
;
rpcInit
.
afp
=
retrieveAuthInfo
;
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
localPort
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
numOfThreads
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-m"
)
==
0
&&
i
<
argc
-
1
)
{
msgSize
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-s"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
sessions
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-o"
)
==
0
&&
i
<
argc
-
1
)
{
tsCompressMsgSize
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-w"
)
==
0
&&
i
<
argc
-
1
)
{
commit
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
rpcDebugFlag
=
atoi
(
argv
[
++
i
]);
dDebugFlag
=
rpcDebugFlag
;
uDebugFlag
=
rpcDebugFlag
;
}
else
{
printf
(
"
\n
usage: %s [options]
\n
"
,
argv
[
0
]);
printf
(
" [-p port]: server port number, default is:%d
\n
"
,
rpcInit
.
localPort
);
printf
(
" [-t threads]: number of rpc threads, default is:%d
\n
"
,
rpcInit
.
numOfThreads
);
printf
(
" [-s sessions]: number of sessions, default is:%d
\n
"
,
rpcInit
.
sessions
);
printf
(
" [-m msgSize]: message body size, default is:%d
\n
"
,
msgSize
);
printf
(
" [-o compSize]: compression message size, default is:%d
\n
"
,
tsCompressMsgSize
);
printf
(
" [-w write]: write received data to file(0, 1, 2), default is:%d
\n
"
,
commit
);
printf
(
" [-d debugFlag]: debug flag, default:%d
\n
"
,
rpcDebugFlag
);
printf
(
" [-h help]: print out this help
\n\n
"
);
exit
(
0
);
}
}
tsAsyncLog
=
0
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
taosInitLog
(
"server.log"
,
100000
,
10
);
void
*
pRpc
=
rpcOpen
(
&
rpcInit
);
if
(
pRpc
==
NULL
)
{
tError
(
"failed to start RPC server"
);
return
-
1
;
}
tInfo
(
"RPC server is running, ctrl-c to exit"
);
if
(
commit
)
{
dataFd
=
open
(
dataName
,
O_APPEND
|
O_CREAT
|
O_WRONLY
,
S_IRWXU
|
S_IRWXG
|
S_IRWXO
);
if
(
dataFd
<
0
)
tInfo
(
"failed to open data file, reason:%s"
,
strerror
(
errno
));
}
qhandle
=
taosOpenQueue
();
qset
=
taosOpenQset
();
taosAddIntoQset
(
qset
,
qhandle
,
NULL
);
processShellMsg
();
if
(
dataFd
>=
0
)
{
close
(
dataFd
);
remove
(
dataName
);
}
return
0
;
}
source/util/src/terror.c
浏览文件 @
6be271c8
...
...
@@ -354,6 +354,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SCH_NOT_EXIST, "Scheduler not exist")
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_NOT_EXIST
,
"Task not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_ALREADY_EXIST
,
"Task already exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_RES_CACHE_NOT_EXIST
,
"Task result cache not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_CANCELLED
,
"Task cancelled"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_DROPPED
,
"Task dropped"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_CANCELLING
,
"Task cancelling"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_DROPPING
,
"Task dropping"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_DUPLICATTED_OPERATION
,
"Duplicatted operation"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_MSG_ERROR
,
"Task message error"
)
// grant
TAOS_DEFINE_ERROR
(
TSDB_CODE_GRANT_EXPIRED
,
"License expired"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录