Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0cbff0a8
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
0cbff0a8
编写于
4月 25, 2022
作者:
C
Cary Xu
提交者:
GitHub
4月 25, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/TD-14481-3.0
上级
ad9101d3
93bf4fef
变更
20
隐藏空白更改
内联
并排
Showing
20 changed file
with
286 addition
and
219 deletion
+286
-219
example/src/tmq.c
example/src/tmq.c
+4
-4
include/common/tmsg.h
include/common/tmsg.h
+32
-78
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+4
-0
source/client/src/clientHb.c
source/client/src/clientHb.c
+32
-32
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+14
-15
source/client/src/tmq.c
source/client/src/tmq.c
+4
-2
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+11
-0
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+44
-7
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+41
-40
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+5
-2
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+3
-5
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+24
-6
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+45
-21
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+11
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+3
-1
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+1
-1
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+3
-3
source/util/src/terror.c
source/util/src/terror.c
+3
-0
tools/shell/src/shellCommand.c
tools/shell/src/shellCommand.c
+1
-1
tools/shell/src/shellEngine.c
tools/shell/src/shellEngine.c
+1
-1
未找到文件。
example/src/tmq.c
浏览文件 @
0cbff0a8
...
...
@@ -60,7 +60,6 @@ int32_t init_env() {
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
assert
(
0
);
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
...
...
@@ -104,8 +103,8 @@ int32_t create_topic() {
}
taos_free_result
(
pRes
);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as abc1"
);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -163,9 +162,10 @@ tmq_t* build_consumer() {
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"td.connect.db"
,
"abc1"
);
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/
tmq_conf_set_offset_commit_cb
(
conf
,
tmq_commit_cb_print
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
assert
(
tmq
);
return
tmq
;
}
...
...
include/common/tmsg.h
浏览文件 @
0cbff0a8
...
...
@@ -2069,80 +2069,6 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
return
buf
;
}
typedef
struct
{
int64_t
leftForVer
;
int32_t
vgId
;
int32_t
epoch
;
int64_t
consumerId
;
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
}
SMqCancelConnReq
;
static
FORCE_INLINE
int32_t
tEncodeSMqCancelConnReq
(
void
**
buf
,
const
SMqCancelConnReq
*
pReq
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
leftForVer
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
vgId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
epoch
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
topicName
);
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqCancelConnReq
(
void
*
buf
,
SMqCancelConnReq
*
pReq
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
leftForVer
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
vgId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
epoch
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
consumerId
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
topicName
);
return
buf
;
}
typedef
struct
{
int8_t
reserved
;
}
SMqCancelConnRsp
;
typedef
struct
{
int64_t
leftForVer
;
int32_t
vgId
;
int64_t
oldConsumerId
;
int64_t
newConsumerId
;
char
*
topic
;
}
SMqMVRebReq
;
static
FORCE_INLINE
int32_t
tEncodeSMqMVRebReq
(
void
**
buf
,
const
SMqMVRebReq
*
pReq
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
leftForVer
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
vgId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
oldConsumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
newConsumerId
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
topic
);
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqMVRebReq
(
void
*
buf
,
SMqMVRebReq
*
pReq
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
leftForVer
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
vgId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
oldConsumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
newConsumerId
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
topic
);
return
buf
;
}
typedef
struct
{
SMsgHead
header
;
int32_t
vgId
;
int64_t
consumerId
;
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
char
cgroup
[
TSDB_CGROUP_LEN
];
}
SMqSetCVgRsp
;
typedef
struct
{
SMsgHead
header
;
int32_t
vgId
;
int64_t
consumerId
;
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
char
cgroup
[
TSDB_CGROUP_LEN
];
}
SMqMVRebRsp
;
typedef
struct
{
int32_t
vgId
;
int64_t
offset
;
...
...
@@ -2169,6 +2095,24 @@ typedef struct {
SSchema
*
pSchema
;
}
SSchemaWrapper
;
static
FORCE_INLINE
SSchemaWrapper
*
tCloneSSchemaWrapper
(
const
SSchemaWrapper
*
pSchemaWrapper
)
{
SSchemaWrapper
*
pSW
=
(
SSchemaWrapper
*
)
taosMemoryMalloc
(
sizeof
(
SSchemaWrapper
));
if
(
pSW
==
NULL
)
return
pSW
;
pSW
->
nCols
=
pSchemaWrapper
->
nCols
;
pSW
->
pSchema
=
(
SSchema
*
)
taosMemoryCalloc
(
pSW
->
nCols
,
sizeof
(
SSchema
));
if
(
pSW
->
pSchema
==
NULL
)
{
taosMemoryFree
(
pSW
);
return
NULL
;
}
memcpy
(
pSW
->
pSchema
,
pSchemaWrapper
->
pSchema
,
pSW
->
nCols
*
sizeof
(
SSchema
));
return
pSW
;
}
static
FORCE_INLINE
void
tDeleteSSchemaWrapper
(
SSchemaWrapper
*
pSchemaWrapper
)
{
taosMemoryFree
(
pSchemaWrapper
->
pSchema
);
taosMemoryFree
(
pSchemaWrapper
);
}
static
FORCE_INLINE
int32_t
taosEncodeSSchema
(
void
**
buf
,
const
SSchema
*
pSchema
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI8
(
buf
,
pSchema
->
type
);
...
...
@@ -2179,13 +2123,13 @@ static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSSchema
(
void
*
buf
,
SSchema
*
pSchema
)
{
static
FORCE_INLINE
void
*
taosDecodeSSchema
(
const
void
*
buf
,
SSchema
*
pSchema
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
pSchema
->
type
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSchema
->
flags
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSchema
->
bytes
);
buf
=
taosDecodeFixedI16
(
buf
,
&
pSchema
->
colId
);
buf
=
taosDecodeStringTo
(
buf
,
pSchema
->
name
);
return
buf
;
return
(
void
*
)
buf
;
}
static
FORCE_INLINE
int32_t
tEncodeSSchema
(
SCoder
*
pEncoder
,
const
SSchema
*
pSchema
)
{
...
...
@@ -2215,7 +2159,7 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSSchemaWrapper
(
void
*
buf
,
SSchemaWrapper
*
pSW
)
{
static
FORCE_INLINE
void
*
taosDecodeSSchemaWrapper
(
const
void
*
buf
,
SSchemaWrapper
*
pSW
)
{
buf
=
taosDecodeFixedU32
(
buf
,
&
pSW
->
nCols
);
pSW
->
pSchema
=
(
SSchema
*
)
taosMemoryCalloc
(
pSW
->
nCols
,
sizeof
(
SSchema
));
if
(
pSW
->
pSchema
==
NULL
)
{
...
...
@@ -2225,7 +2169,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pS
for
(
int32_t
i
=
0
;
i
<
pSW
->
nCols
;
i
++
)
{
buf
=
taosDecodeSSchema
(
buf
,
&
pSW
->
pSchema
[
i
]);
}
return
buf
;
return
(
void
*
)
buf
;
}
static
FORCE_INLINE
int32_t
tEncodeSSchemaWrapper
(
SCoder
*
pEncoder
,
const
SSchemaWrapper
*
pSW
)
{
...
...
@@ -2632,6 +2576,10 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp
void
*
data
=
taosArrayGetP
(
pRsp
->
blockData
,
i
);
tlen
+=
taosEncodeFixedI32
(
buf
,
bLen
);
tlen
+=
taosEncodeBinary
(
buf
,
data
,
bLen
);
if
(
pRsp
->
withSchema
)
{
SSchemaWrapper
*
pSW
=
(
SSchemaWrapper
*
)
taosArrayGetP
(
pRsp
->
blockSchema
,
i
);
tlen
+=
taosEncodeSSchemaWrapper
(
buf
,
pSW
);
}
}
}
return
tlen
;
...
...
@@ -2644,6 +2592,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
blockNum
);
pRsp
->
blockData
=
taosArrayInit
(
pRsp
->
blockNum
,
sizeof
(
void
*
));
pRsp
->
blockDataLen
=
taosArrayInit
(
pRsp
->
blockNum
,
sizeof
(
void
*
));
pRsp
->
blockSchema
=
taosArrayInit
(
pRsp
->
blockNum
,
sizeof
(
void
*
));
if
(
pRsp
->
blockNum
!=
0
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
pRsp
->
withTbName
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pRsp
->
withSchema
);
...
...
@@ -2656,6 +2605,11 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf
=
taosDecodeBinary
(
buf
,
&
data
,
bLen
);
taosArrayPush
(
pRsp
->
blockDataLen
,
&
bLen
);
taosArrayPush
(
pRsp
->
blockData
,
&
data
);
if
(
pRsp
->
withSchema
)
{
SSchemaWrapper
*
pSW
=
(
SSchemaWrapper
*
)
taosMemoryMalloc
(
sizeof
(
SSchemaWrapper
));
buf
=
taosDecodeSSchemaWrapper
(
buf
,
pSW
);
taosArrayPush
(
pRsp
->
blockSchema
,
&
pSW
);
}
}
}
return
(
void
*
)
buf
;
...
...
source/client/inc/clientInt.h
浏览文件 @
0cbff0a8
...
...
@@ -231,6 +231,10 @@ static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool conver
msg
->
resIter
++
;
if
(
msg
->
resIter
<
msg
->
rsp
.
blockNum
)
{
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
taosArrayGetP
(
msg
->
rsp
.
blockData
,
msg
->
resIter
);
if
(
msg
->
rsp
.
withSchema
)
{
SSchemaWrapper
*
pSW
=
(
SSchemaWrapper
*
)
taosArrayGetP
(
msg
->
rsp
.
blockSchema
,
msg
->
resIter
);
setResSchemaInfo
(
&
msg
->
resInfo
,
pSW
->
pSchema
,
pSW
->
nCols
);
}
setQueryResultFromRsp
(
&
msg
->
resInfo
,
pRetrieve
,
convertUcs4
);
return
&
msg
->
resInfo
;
}
...
...
source/client/src/clientHb.c
浏览文件 @
0cbff0a8
...
...
@@ -14,9 +14,9 @@
*/
#include "catalog.h"
#include "scheduler.h"
#include "clientInt.h"
#include "clientLog.h"
#include "scheduler.h"
#include "trpc.h"
static
SClientHbMgr
clientHbMgr
=
{
0
};
...
...
@@ -110,7 +110,8 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
static
int32_t
hbQueryHbRspHandle
(
SAppHbMgr
*
pAppHbMgr
,
SClientHbRsp
*
pRsp
)
{
SHbConnInfo
*
info
=
taosHashGet
(
pAppHbMgr
->
connInfo
,
&
pRsp
->
connKey
,
sizeof
(
SClientHbKey
));
if
(
NULL
==
info
)
{
tscWarn
(
"fail to get connInfo, may be dropped, refId:%"
PRIx64
", type:%d"
,
pRsp
->
connKey
.
tscRid
,
pRsp
->
connKey
.
connType
);
tscWarn
(
"fail to get connInfo, may be dropped, refId:%"
PRIx64
", type:%d"
,
pRsp
->
connKey
.
tscRid
,
pRsp
->
connKey
.
connType
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -121,7 +122,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
}
else
{
updateEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
,
&
pRsp
->
query
->
epSet
);
pTscObj
->
connId
=
pRsp
->
query
->
connId
;
if
(
pRsp
->
query
->
killRid
)
{
SRequestObj
*
pRequest
=
acquireRequest
(
pRsp
->
query
->
killRid
);
if
(
NULL
==
pRequest
)
{
...
...
@@ -131,7 +132,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
releaseRequest
(
pRsp
->
query
->
killRid
);
}
}
if
(
pRsp
->
query
->
killConnection
)
{
taos_close
(
pTscObj
);
}
...
...
@@ -139,7 +140,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
releaseTscObj
(
pRsp
->
connKey
.
tscRid
);
}
}
int32_t
kvNum
=
pRsp
->
info
?
taosArrayGetSize
(
pRsp
->
info
)
:
0
;
tscDebug
(
"hb got %d rsp kv"
,
kvNum
);
...
...
@@ -236,24 +237,24 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code)
}
int32_t
hbBuildQueryDesc
(
SQueryHbReqBasic
*
hbBasic
,
STscObj
*
pObj
)
{
int64_t
now
=
taosGetTimestampUs
();
int64_t
now
=
taosGetTimestampUs
();
SQueryDesc
desc
=
{
0
};
int32_t
code
=
0
;
int32_t
code
=
0
;
void
*
pIter
=
taosHashIterate
(
pObj
->
pRequests
,
NULL
);
void
*
pIter
=
taosHashIterate
(
pObj
->
pRequests
,
NULL
);
while
(
pIter
!=
NULL
)
{
int64_t
*
rid
=
pIter
;
int64_t
*
rid
=
pIter
;
SRequestObj
*
pRequest
=
acquireRequest
(
*
rid
);
if
(
NULL
==
pRequest
)
{
continue
;
}
tstrncpy
(
desc
.
sql
,
pRequest
->
sqlstr
,
sizeof
(
desc
.
sql
));
desc
.
stime
=
pRequest
->
metric
.
start
;
desc
.
queryId
=
pRequest
->
requestId
;
desc
.
stime
=
pRequest
->
metric
.
start
;
desc
.
queryId
=
pRequest
->
requestId
;
desc
.
useconds
=
now
-
pRequest
->
metric
.
start
;
desc
.
reqRid
=
pRequest
->
self
;
desc
.
pid
=
hbBasic
->
pid
;
desc
.
reqRid
=
pRequest
->
self
;
desc
.
pid
=
hbBasic
->
pid
;
taosGetFqdn
(
desc
.
fqdn
);
desc
.
subPlanNum
=
pRequest
->
body
.
pDag
?
pRequest
->
body
.
pDag
->
numOfSubplans
:
0
;
...
...
@@ -271,9 +272,9 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
}
}
releaseRequest
(
*
rid
);
releaseRequest
(
*
rid
);
taosArrayPush
(
hbBasic
->
queryDesc
,
&
desc
);
pIter
=
taosHashIterate
(
pObj
->
pRequests
,
pIter
);
}
...
...
@@ -286,14 +287,14 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
tscWarn
(
"tscObj rid %"
PRIx64
" not exist"
,
connKey
->
tscRid
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
int32_t
numOfQueries
=
pTscObj
->
pRequests
?
taosHashGetSize
(
pTscObj
->
pRequests
)
:
0
;
if
(
numOfQueries
<=
0
)
{
releaseTscObj
(
connKey
->
tscRid
);
tscDebug
(
"no queries on connection"
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
SQueryHbReqBasic
*
hbBasic
=
(
SQueryHbReqBasic
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SQueryHbReqBasic
));
if
(
NULL
==
hbBasic
)
{
tscError
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SQueryHbReqBasic
));
...
...
@@ -308,7 +309,7 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
taosMemoryFree
(
hbBasic
);
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
hbBasic
->
connId
=
pTscObj
->
connId
;
hbBasic
->
pid
=
taosGetPId
();
taosGetAppName
(
hbBasic
->
app
,
NULL
);
...
...
@@ -405,7 +406,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
}
hbGetQueryBasicInfo
(
connKey
,
req
);
code
=
hbGetExpiredDBInfo
(
connKey
,
pCatalog
,
req
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
...
...
@@ -471,10 +472,10 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
pIter
=
taosHashIterate
(
pAppHbMgr
->
activeInfo
,
pIter
);
}
// if (code) {
// taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
// taosMemoryFreeClear(pBatchReq);
// }
// if (code) {
// taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
// taosMemoryFreeClear(pBatchReq);
// }
return
pBatchReq
;
}
...
...
@@ -630,24 +631,23 @@ void appHbMgrCleanup(void) {
int
sz
=
taosArrayGetSize
(
clientHbMgr
.
appHbMgrs
);
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SAppHbMgr
*
pTarget
=
taosArrayGetP
(
clientHbMgr
.
appHbMgrs
,
i
);
void
*
pIter
=
taosHashIterate
(
pTarget
->
activeInfo
,
NULL
);
void
*
pIter
=
taosHashIterate
(
pTarget
->
activeInfo
,
NULL
);
while
(
pIter
!=
NULL
)
{
SClientHbReq
*
pOneReq
=
pIter
;
hbFreeReq
(
pOneReq
);
taosHashCleanup
(
pOneReq
->
info
);
pIter
=
taosHashIterate
(
pTarget
->
activeInfo
,
pIter
);
}
}
taosHashCleanup
(
pTarget
->
activeInfo
);
pTarget
->
activeInfo
=
NULL
;
pIter
=
taosHashIterate
(
pTarget
->
connInfo
,
NULL
);
while
(
pIter
!=
NULL
)
{
SHbConnInfo
*
info
=
pIter
;
taosMemoryFree
(
info
->
param
);
pIter
=
taosHashIterate
(
pTarget
->
connInfo
,
pIter
);
}
}
taosHashCleanup
(
pTarget
->
connInfo
);
pTarget
->
connInfo
=
NULL
;
...
...
@@ -668,13 +668,13 @@ int hbMgrInit() {
hbMgrInitHandle
();
// init backgroud thread
/
/hbCreateThread();
/
*hbCreateThread();*/
return
0
;
}
void
hbMgrCleanUp
()
{
//hbStopThread();
//
hbStopThread();
// destroy all appHbMgr
int8_t
old
=
atomic_val_compare_exchange_8
(
&
clientHbMgr
.
inited
,
1
,
0
);
...
...
@@ -747,11 +747,11 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
taosMemoryFree
(
info
->
param
);
taosHashRemove
(
pAppHbMgr
->
connInfo
,
&
connKey
,
sizeof
(
SClientHbKey
));
}
if
(
NULL
==
pReq
||
NULL
==
info
)
{
return
;
}
atomic_sub_fetch_32
(
&
pAppHbMgr
->
connKeyCnt
,
1
);
}
...
...
source/client/src/clientImpl.c
浏览文件 @
0cbff0a8
...
...
@@ -226,17 +226,15 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pPlan
,
SArray
*
pNodeList
)
{
pRequest
->
type
=
pQuery
->
msgType
;
SPlanContext
cxt
=
{
.
queryId
=
pRequest
->
requestId
,
.
acctId
=
pRequest
->
pTscObj
->
acctId
,
.
mgmtEpSet
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
),
.
pAstRoot
=
pQuery
->
pRoot
,
.
showRewrite
=
pQuery
->
showRewrite
,
.
pTransporter
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
.
pMsg
=
pRequest
->
msgBuf
,
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
};
int32_t
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
cxt
.
pCatalog
);
SPlanContext
cxt
=
{.
queryId
=
pRequest
->
requestId
,
.
acctId
=
pRequest
->
pTscObj
->
acctId
,
.
mgmtEpSet
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
),
.
pAstRoot
=
pQuery
->
pRoot
,
.
showRewrite
=
pQuery
->
showRewrite
,
.
pTransporter
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
.
pMsg
=
pRequest
->
msgBuf
,
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
};
int32_t
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
cxt
.
pCatalog
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
qCreateQueryPlan
(
&
cxt
,
pPlan
,
pNodeList
);
}
...
...
@@ -247,6 +245,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
ASSERT
(
pSchema
!=
NULL
&&
numOfCols
>
0
);
pResInfo
->
numOfCols
=
numOfCols
;
// TODO handle memory leak
pResInfo
->
fields
=
taosMemoryCalloc
(
numOfCols
,
sizeof
(
TAOS_FIELD
));
pResInfo
->
userFields
=
taosMemoryCalloc
(
numOfCols
,
sizeof
(
TAOS_FIELD
));
...
...
@@ -282,7 +281,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
SQueryResult
res
=
{.
code
=
0
,
.
numOfRows
=
0
,
.
msgSize
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
msg
=
pRequest
->
msgBuf
};
int32_t
code
=
schedulerExecJob
(
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
queryJob
,
pRequest
->
sqlstr
,
pRequest
->
metric
.
start
,
&
res
);
pRequest
->
metric
.
start
,
&
res
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
...
...
@@ -840,12 +839,12 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
return
code
;
}
char
*
p
=
(
char
*
)
pResultInfo
->
pData
;
char
*
p
=
(
char
*
)
pResultInfo
->
pData
;
int32_t
dataLen
=
*
(
int32_t
*
)
p
;
int32_t
dataLen
=
*
(
int32_t
*
)
p
;
p
+=
sizeof
(
int32_t
);
uint64_t
groupId
=
*
(
uint64_t
*
)
p
;
uint64_t
groupId
=
*
(
uint64_t
*
)
p
;
p
+=
sizeof
(
uint64_t
);
int32_t
*
colLength
=
(
int32_t
*
)
p
;
...
...
source/client/src/tmq.c
浏览文件 @
0cbff0a8
...
...
@@ -376,7 +376,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
ASSERT
(
user
);
ASSERT
(
pass
);
ASSERT
(
conf
->
db
);
/*ASSERT(conf->db);*/
ASSERT
(
conf
->
groupId
[
0
]);
pTmq
->
pTscObj
=
taos_connect_internal
(
conf
->
ip
,
user
,
pass
,
NULL
,
conf
->
db
,
conf
->
port
,
CONN_TYPE__TMQ
);
...
...
@@ -1118,7 +1118,9 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
pRspObj
->
resInfo
.
totalRows
=
0
;
pRspObj
->
resInfo
.
precision
=
TSDB_TIME_PRECISION_MILLI
;
setResSchemaInfo
(
&
pRspObj
->
resInfo
,
pWrapper
->
topicHandle
->
schema
.
pSchema
,
pWrapper
->
topicHandle
->
schema
.
nCols
);
if
(
!
pWrapper
->
msg
.
withSchema
)
{
setResSchemaInfo
(
&
pRspObj
->
resInfo
,
pWrapper
->
topicHandle
->
schema
.
pSchema
,
pWrapper
->
topicHandle
->
schema
.
nCols
);
}
taosFreeQitem
(
pWrapper
);
return
pRspObj
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
0cbff0a8
...
...
@@ -202,6 +202,17 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return
code
;
}
code
=
vnodeStart
(
pImpl
);
if
(
code
!=
0
)
{
tFreeSCreateVnodeReq
(
&
createReq
);
dError
(
"vgId:%d, failed to start sync since %s"
,
createReq
.
vgId
,
terrstr
());
vnodeClose
(
pImpl
);
vnodeDestroy
(
path
,
pMgmt
->
pTfs
);
terrno
=
code
;
return
code
;
}
code
=
vmWriteVnodesToFile
(
pMgmt
);
if
(
code
!=
0
)
{
tFreeSCreateVnodeReq
(
&
createReq
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
0cbff0a8
...
...
@@ -74,12 +74,6 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
return
-
1
;
}
// sync integration
vnodeSyncSetQ
(
pImpl
,
NULL
);
vnodeSyncSetRpc
(
pImpl
,
NULL
);
int32_t
ret
=
vnodeSyncStart
(
pImpl
);
assert
(
ret
==
0
);
taosWLockLatch
(
&
pMgmt
->
latch
);
int32_t
code
=
taosHashPut
(
pMgmt
->
hash
,
&
pVnode
->
vgId
,
sizeof
(
int32_t
),
&
pVnode
,
sizeof
(
SVnodeObj
*
));
taosWUnLockLatch
(
&
pMgmt
->
latch
);
...
...
@@ -153,6 +147,7 @@ static void *vmOpenVnodeFunc(void *param) {
pThread
->
failed
++
;
}
else
{
vmOpenVnode
(
pMgmt
,
pCfg
,
pImpl
);
//vnodeStart(pImpl);
dDebug
(
"vgId:%d, is opened by thread:%d"
,
pCfg
->
vgId
,
pThread
->
threadIndex
);
pThread
->
opened
++
;
}
...
...
@@ -364,10 +359,52 @@ static int32_t vmRequire(SMgmtWrapper *pWrapper, bool *required) {
return
0
;
}
static
int32_t
vmStart
(
SMgmtWrapper
*
pWrapper
)
{
dDebug
(
"vnode-mgmt start to run"
);
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
taosRLockLatch
(
&
pMgmt
->
latch
);
void
*
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
NULL
);
while
(
pIter
)
{
SVnodeObj
**
ppVnode
=
pIter
;
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
continue
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
vnodeStart
(
pVnode
->
pImpl
);
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
pIter
);
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
return
0
;
}
static
void
vmStop
(
SMgmtWrapper
*
pWrapper
)
{
#if 0
dDebug("vnode-mgmt start to stop");
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch);
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode;
vnodeStop(pVnode->pImpl);
pIter = taosHashIterate(pMgmt->hash, pIter);
}
taosRUnLockLatch(&pMgmt->latch);
#endif
}
void
vmSetMgmtFp
(
SMgmtWrapper
*
pWrapper
)
{
SMgmtFp
mgmtFp
=
{
0
};
mgmtFp
.
openFp
=
vmInit
;
mgmtFp
.
closeFp
=
vmCleanup
;
mgmtFp
.
startFp
=
vmStart
;
mgmtFp
.
stopFp
=
vmStop
;
mgmtFp
.
requiredFp
=
vmRequire
;
vmInitMsgHandle
(
pWrapper
);
...
...
@@ -396,4 +433,4 @@ void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) {
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
}
\ No newline at end of file
}
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
0cbff0a8
...
...
@@ -476,32 +476,36 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
int32_t
mndSchedInitSubEp
(
SMnode
*
pMnode
,
const
SMqTopicObj
*
pTopic
,
SMqSubscribeObj
*
pSub
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
SQueryPlan
*
pPlan
=
qStringToQueryPlan
(
pTopic
->
physicalPlan
);
if
(
pPlan
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
SQueryPlan
*
pPlan
=
NULL
;
SSubplan
*
plan
=
NULL
;
if
(
pTopic
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
pPlan
=
qStringToQueryPlan
(
pTopic
->
physicalPlan
);
if
(
pPlan
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
ASSERT
(
pSub
->
vgNum
==
-
1
);
ASSERT
(
pSub
->
vgNum
==
-
1
);
pSub
->
vgNum
=
0
;
pSub
->
vgNum
=
0
;
int32_t
levelNum
=
LIST_LENGTH
(
pPlan
->
pSubplans
);
if
(
levelNum
!=
1
)
{
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_MND_UNSUPPORTED_TOPIC
;
return
-
1
;
}
int32_t
levelNum
=
LIST_LENGTH
(
pPlan
->
pSubplans
);
if
(
levelNum
!=
1
)
{
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_MND_UNSUPPORTED_TOPIC
;
return
-
1
;
}
SNodeListNode
*
inner
=
nodesListGetNode
(
pPlan
->
pSubplans
,
0
);
SNodeListNode
*
inner
=
nodesListGetNode
(
pPlan
->
pSubplans
,
0
);
int32_t
opNum
=
LIST_LENGTH
(
inner
->
pNodeList
);
if
(
opNum
!=
1
)
{
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_MND_UNSUPPORTED_TOPIC
;
return
-
1
;
int32_t
opNum
=
LIST_LENGTH
(
inner
->
pNodeList
);
if
(
opNum
!=
1
)
{
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_MND_UNSUPPORTED_TOPIC
;
return
-
1
;
}
plan
=
nodesListGetNode
(
inner
->
pNodeList
,
0
);
}
SSubplan
*
plan
=
nodesListGetNode
(
inner
->
pNodeList
,
0
);
int64_t
unexistKey
=
-
1
;
SMqConsumerEpInSub
*
pEpInSub
=
taosHashGet
(
pSub
->
consumerHash
,
&
unexistKey
,
sizeof
(
int64_t
));
...
...
@@ -519,38 +523,35 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
}
pSub
->
vgNum
++
;
plan
->
execNode
.
nodeId
=
pVgroup
->
vgId
;
plan
->
execNode
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
SMqVgEp
*
pVgEp
=
taosMemoryMalloc
(
sizeof
(
SMqVgEp
));
pVgEp
->
epSet
=
plan
->
execNode
.
epSet
;
pVgEp
->
vgId
=
plan
->
execNode
.
nodeId
;
#if 0
SMqConsumerEp consumerEp = {0};
consumerEp.status = 0;
consumerEp.consumerId = -1;
consumerEp.epSet = plan->execNode.epSet;
consumerEp.vgId = plan->execNode.nodeId;
#endif
pVgEp
->
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
pVgEp
->
vgId
=
pVgroup
->
vgId
;
taosArrayPush
(
pEpInSub
->
vgs
,
&
pVgEp
);
mDebug
(
"init subscribption %s, assign vg: %d"
,
pSub
->
key
,
pVgEp
->
vgId
);
int32_t
msgLen
;
if
(
qSubPlanToString
(
plan
,
&
pVgEp
->
qmsg
,
&
msgLen
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
if
(
pTopic
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
int32_t
msgLen
;
plan
->
execNode
.
epSet
=
pVgEp
->
epSet
;
plan
->
execNode
.
nodeId
=
pVgEp
->
vgId
;
if
(
qSubPlanToString
(
plan
,
&
pVgEp
->
qmsg
,
&
msgLen
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
}
else
{
pVgEp
->
qmsg
=
strdup
(
""
);
}
taosArrayPush
(
pEpInSub
->
vgs
,
&
pVgEp
);
ASSERT
(
taosHashGetSize
(
pSub
->
consumerHash
)
==
1
);
/*taosArrayPush(pSub->unassignedVg, &consumerEp);*/
}
ASSERT
(
pEpInSub
->
vgs
->
size
>
0
);
pEpInSub
=
taosHashGet
(
pSub
->
consumerHash
,
&
unexistKey
,
sizeof
(
int64_t
));
ASSERT
(
pEpInSub
->
vgs
->
size
>
0
);
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
0cbff0a8
...
...
@@ -282,10 +282,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
topicObj
.
version
=
1
;
topicObj
.
sql
=
strdup
(
pCreate
->
sql
);
topicObj
.
sqlLen
=
strlen
(
pCreate
->
sql
)
+
1
;
topicObj
.
ast
=
strdup
(
pCreate
->
ast
);
topicObj
.
astLen
=
strlen
(
pCreate
->
ast
)
+
1
;
if
(
pCreate
->
ast
&&
pCreate
->
ast
[
0
])
{
topicObj
.
ast
=
strdup
(
pCreate
->
ast
);
topicObj
.
astLen
=
strlen
(
pCreate
->
ast
)
+
1
;
topicObj
.
subType
=
TOPIC_SUB_TYPE__TABLE
;
topicObj
.
withTbName
=
0
;
topicObj
.
withSchema
=
0
;
...
...
@@ -314,6 +314,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
return
-
1
;
}
}
else
{
topicObj
.
ast
=
strdup
(
""
);
topicObj
.
astLen
=
1
;
topicObj
.
physicalPlan
=
strdup
(
""
);
topicObj
.
subType
=
TOPIC_SUB_TYPE__DB
;
topicObj
.
withTbName
=
1
;
topicObj
.
withSchema
=
1
;
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
0cbff0a8
...
...
@@ -61,6 +61,9 @@ int32_t vnodeSync(SVnode *pVnode);
int32_t
vnodeGetLoad
(
SVnode
*
pVnode
,
SVnodeLoad
*
pLoad
);
int
vnodeValidateTableHash
(
SVnodeCfg
*
pVnodeOptions
,
char
*
tableFName
);
int32_t
vnodeStart
(
SVnode
*
pVnode
);
void
vnodeStop
(
SVnode
*
pVnode
);
int64_t
vnodeGetSyncHandle
(
SVnode
*
pVnode
);
void
vnodeGetSnapshot
(
SVnode
*
pVnode
,
SSnapshot
*
pSnapshot
);
...
...
@@ -171,11 +174,6 @@ typedef struct {
uint64_t
uid
;
}
STableKeyInfo
;
// sync integration
void
vnodeSyncSetQ
(
SVnode
*
pVnode
,
void
*
qHandle
);
void
vnodeSyncSetRpc
(
SVnode
*
pVnode
,
void
*
rpcHandle
);
int32_t
vnodeSyncStart
(
SVnode
*
pVnode
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
0cbff0a8
...
...
@@ -410,8 +410,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqDataBlkRsp
rsp
=
{
0
};
rsp
.
reqOffset
=
pReq
->
currentOffset
;
rsp
.
withSchema
=
pExec
->
withSchema
;
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rsp
.
blockDataLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
rsp
.
blockSchema
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
while
(
1
)
{
consumerEpoch
=
atomic_load_32
(
&
pExec
->
epoch
);
...
...
@@ -511,6 +513,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT
(
actualLen
<=
dataStrLen
);
taosArrayPush
(
rsp
.
blockDataLen
,
&
actualLen
);
taosArrayPush
(
rsp
.
blockData
,
&
buf
);
if
(
pExec
->
withSchema
)
{
SSchemaWrapper
*
pSW
=
tCloneSSchemaWrapper
(
pExec
->
pExecReader
[
workerId
]
->
pSchemaWrapper
);
taosArrayPush
(
rsp
.
blockSchema
,
&
pSW
);
}
rsp
.
blockNum
++
;
}
// db subscribe
...
...
@@ -539,6 +547,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT
(
actualLen
<=
dataStrLen
);
taosArrayPush
(
rsp
.
blockDataLen
,
&
actualLen
);
taosArrayPush
(
rsp
.
blockData
,
&
buf
);
SSchemaWrapper
*
pSW
=
tCloneSSchemaWrapper
(
pExec
->
pExecReader
[
workerId
]
->
pSchemaWrapper
);
taosArrayPush
(
rsp
.
blockSchema
,
&
pSW
);
rsp
.
blockNum
++
;
}
}
else
{
...
...
@@ -585,6 +597,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO destroy
taosArrayDestroy
(
rsp
.
blockData
);
taosArrayDestroy
(
rsp
.
blockDataLen
);
taosArrayDestroyP
(
rsp
.
blockSchema
,
(
FDelete
)
tDeleteSSchemaWrapper
);
return
0
;
}
...
...
@@ -826,12 +840,16 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pExec
->
pWalReader
=
walOpenReadHandle
(
pTq
->
pVnode
->
pWal
);
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
pExec
->
pExecReader
[
i
]
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
SReadHandle
handle
=
{
.
reader
=
pExec
->
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
};
pExec
->
task
[
i
]
=
qCreateStreamExecTaskInfo
(
pExec
->
qmsg
,
&
handle
);
ASSERT
(
pExec
->
task
[
i
]);
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
SReadHandle
handle
=
{
.
reader
=
pExec
->
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
};
pExec
->
task
[
i
]
=
qCreateStreamExecTaskInfo
(
pExec
->
qmsg
,
&
handle
);
ASSERT
(
pExec
->
task
[
i
]);
}
else
{
pExec
->
task
[
i
]
=
NULL
;
}
}
taosHashPut
(
pTq
->
execs
,
req
.
subKey
,
strlen
(
req
.
subKey
),
pExec
,
sizeof
(
STqExec
));
return
0
;
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
0cbff0a8
...
...
@@ -65,7 +65,9 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
ASSERT
(
pHandle
->
tbIdHash
);
if
(
pHandle
->
tbIdHash
==
NULL
)
{
return
true
;
}
void
*
ret
=
taosHashGet
(
pHandle
->
tbIdHash
,
&
pHandle
->
msgIter
.
uid
,
sizeof
(
int64_t
));
if
(
ret
!=
NULL
)
{
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
...
...
@@ -109,26 +111,15 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
*
pNumOfRows
=
pHandle
->
pBlock
->
numOfRows
;
int32_t
colNumNeed
=
taosArrayGetSize
(
pHandle
->
pColIdList
);
if
(
colNumNeed
>
pSchemaWrapper
->
nCols
)
{
colNumNeed
=
pSchemaWrapper
->
nCols
;
}
*
ppCols
=
taosArrayInit
(
colNumNeed
,
sizeof
(
SColumnInfoData
));
if
(
*
ppCols
==
NULL
)
{
return
-
1
;
}
if
(
colNumNeed
==
0
)
{
*
ppCols
=
taosArrayInit
(
pSchemaWrapper
->
nCols
,
sizeof
(
SColumnInfoData
));
if
(
*
ppCols
==
NULL
)
{
return
-
1
;
}
int32_t
colMeta
=
0
;
int32_t
colNeed
=
0
;
while
(
colMeta
<
pSchemaWrapper
->
nCols
&&
colNeed
<
colNumNeed
)
{
SSchema
*
pColSchema
=
&
pSchemaWrapper
->
pSchema
[
colMeta
];
col_id_t
colIdSchema
=
pColSchema
->
colId
;
col_id_t
colIdNeed
=
*
(
col_id_t
*
)
taosArrayGet
(
pHandle
->
pColIdList
,
colNeed
);
if
(
colIdSchema
<
colIdNeed
)
{
colMeta
++
;
}
else
if
(
colIdSchema
>
colIdNeed
)
{
colNeed
++
;
}
else
{
int32_t
colMeta
=
0
;
while
(
colMeta
<
pSchemaWrapper
->
nCols
)
{
SSchema
*
pColSchema
=
&
pSchemaWrapper
->
pSchema
[
colMeta
];
SColumnInfoData
colInfo
=
{
0
};
colInfo
.
info
.
bytes
=
pColSchema
->
bytes
;
colInfo
.
info
.
colId
=
pColSchema
->
colId
;
...
...
@@ -139,7 +130,40 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
}
taosArrayPush
(
*
ppCols
,
&
colInfo
);
colMeta
++
;
colNeed
++
;
}
}
else
{
if
(
colNumNeed
>
pSchemaWrapper
->
nCols
)
{
colNumNeed
=
pSchemaWrapper
->
nCols
;
}
*
ppCols
=
taosArrayInit
(
colNumNeed
,
sizeof
(
SColumnInfoData
));
if
(
*
ppCols
==
NULL
)
{
return
-
1
;
}
int32_t
colMeta
=
0
;
int32_t
colNeed
=
0
;
while
(
colMeta
<
pSchemaWrapper
->
nCols
&&
colNeed
<
colNumNeed
)
{
SSchema
*
pColSchema
=
&
pSchemaWrapper
->
pSchema
[
colMeta
];
col_id_t
colIdSchema
=
pColSchema
->
colId
;
col_id_t
colIdNeed
=
*
(
col_id_t
*
)
taosArrayGet
(
pHandle
->
pColIdList
,
colNeed
);
if
(
colIdSchema
<
colIdNeed
)
{
colMeta
++
;
}
else
if
(
colIdSchema
>
colIdNeed
)
{
colNeed
++
;
}
else
{
SColumnInfoData
colInfo
=
{
0
};
colInfo
.
info
.
bytes
=
pColSchema
->
bytes
;
colInfo
.
info
.
colId
=
pColSchema
->
colId
;
colInfo
.
info
.
type
=
pColSchema
->
type
;
if
(
colInfoDataEnsureCapacity
(
&
colInfo
,
0
,
*
pNumOfRows
)
<
0
)
{
goto
FAIL
;
}
taosArrayPush
(
*
ppCols
,
&
colInfo
);
colMeta
++
;
colNeed
++
;
}
}
}
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
0cbff0a8
...
...
@@ -14,6 +14,7 @@
*/
#include "vnodeInt.h"
#include "vnodeSync.h"
int
vnodeCreate
(
const
char
*
path
,
SVnodeCfg
*
pCfg
,
STfs
*
pTfs
)
{
SVnodeInfo
info
=
{
0
};
...
...
@@ -171,6 +172,16 @@ void vnodeClose(SVnode *pVnode) {
}
}
// start the sync timer after the queue is ready
int32_t
vnodeStart
(
SVnode
*
pVnode
)
{
vnodeSyncSetQ
(
pVnode
,
NULL
);
vnodeSyncSetRpc
(
pVnode
,
NULL
);
vnodeSyncStart
(
pVnode
);
return
0
;
}
void
vnodeStop
(
SVnode
*
pVnode
)
{}
int64_t
vnodeGetSyncHandle
(
SVnode
*
pVnode
)
{
return
pVnode
->
sync
;
}
void
vnodeGetSnapshot
(
SVnode
*
pVnode
,
SSnapshot
*
pSnapshot
)
{
pSnapshot
->
lastApplyIndex
=
pVnode
->
state
.
committed
;
}
\ No newline at end of file
source/libs/executor/src/executorimpl.c
浏览文件 @
0cbff0a8
...
...
@@ -268,7 +268,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
int32_t
numOfCols
=
LIST_LENGTH
(
pNode
->
pSlots
);
SSDataBlock
*
pBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
));
pBlock
->
info
.
numOfCols
=
numOfCols
;
pBlock
->
pDataBlock
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumnInfoData
));
pBlock
->
info
.
blockId
=
pNode
->
dataBlockId
;
...
...
@@ -294,6 +293,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
taosArrayPush
(
pBlock
->
pDataBlock
,
&
idata
);
}
pBlock
->
info
.
numOfCols
=
taosArrayGetSize
(
pBlock
->
pDataBlock
);
return
pBlock
;
}
...
...
@@ -1032,6 +1032,8 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
pColInfo
->
info
.
bytes
=
tDataTypes
[
type
].
bytes
;
pInput
->
pData
[
paramIndex
]
=
pColInfo
;
}
else
{
pColInfo
=
pInput
->
pData
[
paramIndex
];
}
ASSERT
(
!
IS_VAR_DATA_TYPE
(
type
));
...
...
source/libs/function/src/builtins.c
浏览文件 @
0cbff0a8
...
...
@@ -355,7 +355,7 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
pFunc
->
node
.
resType
=
(
SDataType
)
{
.
bytes
=
2
4
,
.
type
=
TSDB_DATA_TYPE_BINARY
};
pFunc
->
node
.
resType
=
(
SDataType
)
{
.
bytes
=
6
4
,
.
type
=
TSDB_DATA_TYPE_BINARY
};
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/scalar/src/scalar.c
浏览文件 @
0cbff0a8
...
...
@@ -489,7 +489,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
if
(
fmIsUserDefinedFunc
(
node
->
funcId
))
{
return
DEAL_RES_CONTINUE
;
}
FOREACH
(
tnode
,
node
->
pParameterList
)
{
if
(
!
SCL_IS_CONST_NODE
(
tnode
))
{
return
DEAL_RES_CONTINUE
;
...
...
@@ -517,8 +517,8 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
res
->
node
.
resType
=
node
->
node
.
resType
;
int32_t
type
=
output
.
columnData
->
info
.
type
;
if
(
IS_VAR_DATA_TYPE
(
type
))
{
res
->
datum
.
p
=
output
.
columnData
->
pData
;
output
.
columnData
->
pData
=
NULL
;
res
->
datum
.
p
=
taosMemoryCalloc
(
res
->
node
.
resType
.
bytes
+
VARSTR_HEADER_SIZE
+
1
,
1
)
;
memcpy
(
res
->
datum
.
p
,
output
.
columnData
->
pData
,
varDataTLen
(
output
.
columnData
->
pData
))
;
}
else
{
memcpy
(
nodesGetValueFromNode
(
res
),
output
.
columnData
->
pData
,
tDataTypes
[
type
].
bytes
);
}
...
...
source/util/src/terror.c
浏览文件 @
0cbff0a8
...
...
@@ -403,6 +403,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, "Invalid msg checksum"
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INVALID_MSGLEN
,
"Invalid msg length"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INVALID_MSGTYPE
,
"Invalid msg type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_NOT_LEADER
,
"Sync not leader"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INTERNAL_ERROR
,
"Sync internal error"
)
// wal
TAOS_DEFINE_ERROR
(
TSDB_CODE_WAL_APP_ERROR
,
"Unexpected generic error in wal"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_WAL_FILE_CORRUPTED
,
"WAL file is corrupted"
)
...
...
tools/shell/src/shellCommand.c
浏览文件 @
0cbff0a8
...
...
@@ -395,7 +395,7 @@ void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos) {
void
shellShowOnScreen
(
SShellCmd
*
cmd
)
{
struct
winsize
w
;
if
(
ioctl
(
0
,
TIOCGWINSZ
,
&
w
)
<
0
||
w
.
ws_col
==
0
||
w
.
ws_row
==
0
)
{
fprintf
(
stderr
,
"No stream device
\n
"
);
//
fprintf(stderr, "No stream device\n");
w
.
ws_col
=
120
;
w
.
ws_row
=
30
;
}
...
...
tools/shell/src/shellEngine.c
浏览文件 @
0cbff0a8
...
...
@@ -750,7 +750,7 @@ void shellReadHistory() {
void
shellWriteHistory
()
{
SShellHistory
*
pHistory
=
&
shell
.
history
;
TdFilePtr
pFile
=
taosOpenFile
(
pHistory
->
file
,
TD_FILE_
WRITE
|
TD_FILE_STREAM
);
TdFilePtr
pFile
=
taosOpenFile
(
pHistory
->
file
,
TD_FILE_
CREATE
|
TD_FILE_WRITE
|
TD_FILE_STREAM
|
TD_FILE_APPEND
);
if
(
pFile
==
NULL
)
return
;
for
(
int32_t
i
=
pHistory
->
hstart
;
i
!=
pHistory
->
hend
;)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录