Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9a930030
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
9a930030
编写于
7月 27, 2022
作者:
C
cpwu
提交者:
GitHub
7月 27, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into cpwu/3.0
上级
b14dee97
23bc13a9
变更
30
隐藏空白更改
内联
并排
Showing
30 changed file
with
661 addition
and
231 deletion
+661
-231
examples/c/tmq.c
examples/c/tmq.c
+2
-2
examples/c/tmq_taosx.c
examples/c/tmq_taosx.c
+69
-48
include/client/taos.h
include/client/taos.h
+9
-6
include/common/tcommon.h
include/common/tcommon.h
+1
-0
include/common/tmsg.h
include/common/tmsg.h
+1
-0
include/libs/executor/dataSinkMgt.h
include/libs/executor/dataSinkMgt.h
+1
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+1
-0
include/libs/qcom/query.h
include/libs/qcom/query.h
+2
-2
source/client/src/tmq.c
source/client/src/tmq.c
+411
-84
source/common/src/tmsg.c
source/common/src/tmsg.c
+3
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+6
-5
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+17
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-0
source/libs/executor/src/dataDeleter.c
source/libs/executor/src/dataDeleter.c
+1
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+3
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+2
-0
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+1
-1
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+1
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+3
-7
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+4
-2
source/libs/transport/src/thttp.c
source/libs/transport/src/thttp.c
+4
-2
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+6
-4
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+73
-42
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+5
-3
source/os/src/osSysinfo.c
source/os/src/osSysinfo.c
+17
-1
tests/script/tsim/valgrind/checkError6.sim
tests/script/tsim/valgrind/checkError6.sim
+4
-2
tests/system-test/2-query/cast.py
tests/system-test/2-query/cast.py
+4
-8
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+6
-7
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+2
-2
未找到文件。
examples/c/tmq.c
浏览文件 @
9a930030
...
...
@@ -29,7 +29,7 @@ static void msg_process(TAOS_RES* msg) {
printf
(
"vg: %d
\n
"
,
tmq_get_vgroup_id
(
msg
));
if
(
tmq_get_res_type
(
msg
)
==
TMQ_RES_TABLE_META
)
{
tmq_raw_data
raw
=
{
0
};
int32_t
code
=
tmq_get_raw
_meta
(
msg
,
&
raw
);
int32_t
code
=
tmq_get_raw
(
msg
,
&
raw
);
if
(
code
==
0
)
{
TAOS
*
pConn
=
taos_connect
(
"192.168.1.86"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
...
...
@@ -50,7 +50,7 @@ static void msg_process(TAOS_RES* msg) {
}
taos_free_result
(
pRes
);
int32_t
ret
=
t
aos_write_raw_meta
(
pConn
,
raw
);
int32_t
ret
=
t
mq_write_raw
(
pConn
,
raw
);
printf
(
"write raw data: %s
\n
"
,
tmq_err2str
(
ret
));
taos_close
(
pConn
);
}
...
...
examples/c/tmq_taosx.c
浏览文件 @
9a930030
...
...
@@ -49,18 +49,25 @@ static void msg_process(TAOS_RES* msg) {
printf
(
"meta result: %s
\n
"
,
result
);
}
tmq_free_json_meta
(
result
);
tmq_raw_data
raw
=
{
0
};
tmq_get_raw_meta
(
msg
,
&
raw
);
int32_t
ret
=
taos_write_raw_meta
(
pConn
,
raw
);
printf
(
"write raw meta: %s
\n
"
,
tmq_err2str
(
ret
));
}
if
(
tmq_get_res_type
(
msg
)
==
TMQ_RES_DATA
){
int32_t
ret
=
taos_write_raw_data
(
pConn
,
msg
);
printf
(
"write raw data: %s
\n
"
,
tmq_err2str
(
ret
));
}
tmq_raw_data
raw
=
{
0
};
tmq_get_raw
(
msg
,
&
raw
);
int32_t
ret
=
tmq_write_raw
(
pConn
,
raw
);
printf
(
"write raw data: %s
\n
"
,
tmq_err2str
(
ret
));
// else{
// while(1){
// int numOfRows = 0;
// void *pData = NULL;
// taos_fetch_raw_block(msg, &numOfRows, &pData);
// if(numOfRows == 0) break;
// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows);
// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg));
// printf("write raw data: %s\n", tmq_err2str(ret));
// }
// }
taos_close
(
pConn
);
}
...
...
@@ -121,7 +128,7 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct0 values(
now
, 1, 2, 'a')"
);
pRes
=
taos_query
(
pConn
,
"insert into ct0 values(
1626006833600
, 1, 2, 'a')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -142,7 +149,7 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct1 values(
now
, 3, 4, 'b')"
);
pRes
=
taos_query
(
pConn
,
"insert into ct1 values(
1626006833600
, 3, 4, 'b')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -156,7 +163,7 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(
now, 5, 6, 'c') ct1 values(now+1s, 2, 3, 'sds') (now+2s, 4, 5, 'ddd') ct0 values(now+1s
, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(
1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602
, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -177,7 +184,14 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(now+7s, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (now+9s, 51, 62, 'c333', 940)"
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (1626006833609, 51, 62, 'c333', 940)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 select * from ct1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -198,19 +212,26 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
// pRes = taos_query(pConn, "drop table ct3 ct1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "drop table st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
pRes
=
taos_query
(
pConn
,
"delete from abc1 .ct3 where ts < 1626006833606"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop table ct3 ct1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop table st1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
...
...
@@ -261,12 +282,12 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
//
pRes = taos_query(pConn, "drop table n1");
//
if (taos_errno(pRes) != 0) {
//
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
//
return -1;
//
}
//
taos_free_result(pRes);
pRes
=
taos_query
(
pConn
,
"drop table n1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt(ts timestamp, i int) tags(t json)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
...
...
@@ -289,21 +310,21 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
//
pRes = taos_query(pConn,
//
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
//
"nchar(8), t4 bool)");
//
if (taos_errno(pRes) != 0) {
//
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
//
return -1;
//
}
//
taos_free_result(pRes);
//
//
pRes = taos_query(pConn, "drop table st1");
//
if (taos_errno(pRes) != 0) {
//
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
//
return -1;
//
}
//
taos_free_result(pRes);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop table st1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
0
;
...
...
include/client/taos.h
浏览文件 @
9a930030
...
...
@@ -260,17 +260,20 @@ enum tmq_res_t {
};
typedef
struct
tmq_raw_data
{
void
*
raw
_meta
;
uint32_t
raw_
meta_
len
;
uint16_t
raw_
meta_
type
;
void
*
raw
;
uint32_t
raw_len
;
uint16_t
raw_type
;
}
tmq_raw_data
;
typedef
enum
tmq_res_t
tmq_res_t
;
DLL_EXPORT
tmq_res_t
tmq_get_res_type
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
tmq_get_raw_meta
(
TAOS_RES
*
res
,
tmq_raw_data
*
raw_meta
);
DLL_EXPORT
int32_t
taos_write_raw_meta
(
TAOS
*
taos
,
tmq_raw_data
raw_meta
);
DLL_EXPORT
int32_t
taos_write_raw_data
(
TAOS
*
taos
,
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
tmq_get_raw
(
TAOS_RES
*
res
,
tmq_raw_data
*
raw
);
DLL_EXPORT
int32_t
tmq_write_raw
(
TAOS
*
taos
,
tmq_raw_data
raw
);
DLL_EXPORT
int
taos_write_raw_block
(
TAOS
*
taos
,
int
numOfRows
,
char
*
pData
,
const
char
*
tbname
);
DLL_EXPORT
void
tmq_free_raw
(
tmq_raw_data
raw
);
DLL_EXPORT
char
*
tmq_get_json_meta
(
TAOS_RES
*
res
);
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT
void
tmq_free_json_meta
(
char
*
jsonMeta
);
DLL_EXPORT
const
char
*
tmq_get_topic_name
(
TAOS_RES
*
res
);
...
...
include/common/tcommon.h
浏览文件 @
9a930030
...
...
@@ -40,6 +40,7 @@ enum {
|| x == TDMT_VND_CREATE_TABLE \
|| x == TDMT_VND_ALTER_TABLE \
|| x == TDMT_VND_DROP_TABLE \
|| x == TDMT_VND_DELETE \
)
// clang-format on
...
...
include/common/tmsg.h
浏览文件 @
9a930030
...
...
@@ -3044,6 +3044,7 @@ typedef struct SDeleteRes {
int64_t
skey
;
int64_t
ekey
;
int64_t
affectedRows
;
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
}
SDeleteRes
;
int32_t
tEncodeDeleteRes
(
SEncoder
*
pCoder
,
const
SDeleteRes
*
pRes
);
...
...
include/libs/executor/dataSinkMgt.h
浏览文件 @
9a930030
...
...
@@ -38,6 +38,7 @@ typedef struct SDeleterRes {
int64_t
skey
;
int64_t
ekey
;
int64_t
affectedRows
;
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
}
SDeleterRes
;
typedef
struct
SDeleterParam
{
...
...
include/libs/nodes/plannodes.h
浏览文件 @
9a930030
...
...
@@ -503,6 +503,7 @@ typedef struct SDataDeleterNode {
uint64_t
tableId
;
int8_t
tableType
;
// table type
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
char
tsColName
[
TSDB_COL_NAME_LEN
];
STimeWindow
deleteTimeRange
;
SNode
*
pAffectedRows
;
}
SDataDeleterNode
;
...
...
include/libs/qcom/query.h
浏览文件 @
9a930030
...
...
@@ -251,8 +251,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
(_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK)
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_
V
ND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
(_type) == TDMT_
V
ND_DROP_STB)
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_
M
ND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
(_type) == TDMT_
M
ND_DROP_STB)
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \
...
...
source/client/src/tmq.c
浏览文件 @
9a930030
...
...
@@ -1206,6 +1206,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
pMsg
->
len
-
sizeof
(
SMqRspHead
));
tDecodeSMqDataRsp
(
&
decoder
,
&
pRspWrapper
->
dataRsp
);
tDecoderClear
(
&
decoder
);
memcpy
(
&
pRspWrapper
->
dataRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
}
else
{
ASSERT
(
rspType
==
TMQ_MSG_TYPE__POLL_META_RSP
);
...
...
@@ -1859,6 +1860,10 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
if
(
TD_RES_TMQ
(
res
))
{
return
TMQ_RES_DATA
;
}
else
if
(
TD_RES_TMQ_META
(
res
))
{
SMqMetaRspObj
*
pMetaRspObj
=
(
SMqMetaRspObj
*
)
res
;
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_DELETE
)
{
return
TMQ_RES_DATA
;
}
return
TMQ_RES_TABLE_META
;
}
else
{
return
TMQ_RES_INVALID
;
...
...
@@ -1913,17 +1918,6 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return
NULL
;
}
int32_t
tmq_get_raw_meta
(
TAOS_RES
*
res
,
tmq_raw_data
*
raw
)
{
if
(
TD_RES_TMQ_META
(
res
)
&&
raw
)
{
SMqMetaRspObj
*
pMetaRspObj
=
(
SMqMetaRspObj
*
)
res
;
raw
->
raw_meta
=
pMetaRspObj
->
metaRsp
.
metaRsp
;
raw
->
raw_meta_len
=
pMetaRspObj
->
metaRsp
.
metaRspLen
;
raw
->
raw_meta_type
=
pMetaRspObj
->
metaRsp
.
resMsgType
;
return
TSDB_CODE_SUCCESS
;
}
return
TSDB_CODE_INVALID_PARA
;
}
static
char
*
buildCreateTableJson
(
SSchemaWrapper
*
schemaRow
,
SSchemaWrapper
*
schemaTag
,
char
*
name
,
int64_t
id
,
int8_t
t
)
{
char
*
string
=
NULL
;
...
...
@@ -2436,30 +2430,6 @@ _exit:
return
string
;
}
char
*
tmq_get_json_meta
(
TAOS_RES
*
res
)
{
if
(
!
TD_RES_TMQ_META
(
res
))
{
return
NULL
;
}
SMqMetaRspObj
*
pMetaRspObj
=
(
SMqMetaRspObj
*
)
res
;
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_CREATE_STB
)
{
return
processCreateStb
(
&
pMetaRspObj
->
metaRsp
);
}
else
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_ALTER_STB
)
{
return
processAlterStb
(
&
pMetaRspObj
->
metaRsp
);
}
else
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_DROP_STB
)
{
return
processDropSTable
(
&
pMetaRspObj
->
metaRsp
);
}
else
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_CREATE_TABLE
)
{
return
processCreateTable
(
&
pMetaRspObj
->
metaRsp
);
}
else
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_ALTER_TABLE
)
{
return
processAlterTable
(
&
pMetaRspObj
->
metaRsp
);
}
else
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_DROP_TABLE
)
{
return
processDropTable
(
&
pMetaRspObj
->
metaRsp
);
}
return
NULL
;
}
void
tmq_free_json_meta
(
char
*
jsonMeta
)
{
taosMemoryFreeClear
(
jsonMeta
);
}
static
int32_t
taosCreateStb
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
)
{
SVCreateStbReq
req
=
{
0
};
SDecoder
coder
;
...
...
@@ -2531,6 +2501,13 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
pQuery
.
stableQuery
=
true
;
launchQueryImpl
(
pRequest
,
&
pQuery
,
true
,
NULL
);
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
){
SCatalog
*
pCatalog
=
NULL
;
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
catalogRemoveTableMeta
(
pCatalog
,
&
tableName
);
}
code
=
pRequest
->
code
;
taosMemoryFree
(
pCmdMsg
.
pMsg
);
...
...
@@ -2572,7 +2549,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
pReq
.
suid
=
req
.
suid
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SName
tableName
;
SName
tableName
=
{
0
}
;
tNameExtractFullName
(
toName
(
pTscObj
->
acctId
,
pRequest
->
pDb
,
req
.
name
,
&
tableName
),
pReq
.
name
);
SCmdMsgInfo
pCmdMsg
=
{
0
};
...
...
@@ -2593,6 +2570,13 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
pQuery
.
stableQuery
=
true
;
launchQueryImpl
(
pRequest
,
&
pQuery
,
true
,
NULL
);
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
){
SCatalog
*
pCatalog
=
NULL
;
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
catalogRemoveTableMeta
(
pCatalog
,
&
tableName
);
}
code
=
pRequest
->
code
;
taosMemoryFree
(
pCmdMsg
.
pMsg
);
...
...
@@ -2659,17 +2643,20 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
.
requestId
=
pRequest
->
requestId
,
.
requestObjRefId
=
pRequest
->
self
,
.
mgmtEps
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
)};
pRequest
->
tableList
=
taosArrayInit
(
req
.
nReqs
,
sizeof
(
SName
));
// loop to create table
for
(
int32_t
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
pCreateReq
=
req
.
pReqs
+
iReq
;
SVgroupInfo
pInfo
=
{
0
};
SName
pName
;
SName
pName
=
{
0
}
;
toName
(
pTscObj
->
acctId
,
pRequest
->
pDb
,
pCreateReq
->
name
,
&
pName
);
code
=
catalogGetTableHashVgroup
(
pCatalog
,
&
conn
,
&
pName
,
&
pInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
}
taosArrayPush
(
pRequest
->
tableList
,
&
pName
);
SVgroupCreateTableBatch
*
pTableBatch
=
taosHashGet
(
pVgroupHashmap
,
&
pInfo
.
vgId
,
sizeof
(
pInfo
.
vgId
));
if
(
pTableBatch
==
NULL
)
{
...
...
@@ -2703,8 +2690,11 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
goto
end
;
}
launchQueryImpl
(
pRequest
,
pQuery
,
false
,
NULL
);
pQuery
=
NULL
;
// no need to free in the end
launchQueryImpl
(
pRequest
,
pQuery
,
true
,
NULL
);
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
){
removeMeta
(
pTscObj
,
pRequest
->
tableList
);
}
code
=
pRequest
->
code
;
end:
...
...
@@ -2772,19 +2762,21 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
.
requestId
=
pRequest
->
requestId
,
.
requestObjRefId
=
pRequest
->
self
,
.
mgmtEps
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
)};
pRequest
->
tableList
=
taosArrayInit
(
req
.
nReqs
,
sizeof
(
SName
));
// loop to create table
for
(
int32_t
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
pDropReq
=
req
.
pReqs
+
iReq
;
pDropReq
->
igNotExists
=
true
;
SVgroupInfo
pInfo
=
{
0
};
SName
pName
;
SName
pName
=
{
0
}
;
toName
(
pTscObj
->
acctId
,
pRequest
->
pDb
,
pDropReq
->
name
,
&
pName
);
code
=
catalogGetTableHashVgroup
(
pCatalog
,
&
conn
,
&
pName
,
&
pInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
}
taosArrayPush
(
pRequest
->
tableList
,
&
pName
);
SVgroupDropTableBatch
*
pTableBatch
=
taosHashGet
(
pVgroupHashmap
,
&
pInfo
.
vgId
,
sizeof
(
pInfo
.
vgId
));
if
(
pTableBatch
==
NULL
)
{
SVgroupDropTableBatch
tBatch
=
{
0
};
...
...
@@ -2815,8 +2807,10 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
goto
end
;
}
launchQueryImpl
(
pRequest
,
pQuery
,
false
,
NULL
);
pQuery
=
NULL
;
// no need to free in the end
launchQueryImpl
(
pRequest
,
pQuery
,
true
,
NULL
);
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
){
removeMeta
(
pTscObj
,
pRequest
->
tableList
);
}
code
=
pRequest
->
code
;
end:
...
...
@@ -2827,6 +2821,70 @@ end:
return
code
;
}
// delete from db.tabl where .. -> delete from tabl where ..
// delete from db .tabl where .. -> delete from tabl where ..
static
void
getTbName
(
char
*
sql
){
char
*
ch
=
sql
;
bool
inBackQuote
=
false
;
int8_t
dotIndex
=
0
;
while
(
*
ch
!=
'\0'
){
if
(
!
inBackQuote
&&
*
ch
==
'`'
){
inBackQuote
=
true
;
ch
++
;
continue
;
}
if
(
inBackQuote
&&
*
ch
==
'`'
){
inBackQuote
=
false
;
ch
++
;
continue
;
}
if
(
!
inBackQuote
&&
*
ch
==
'.'
){
dotIndex
++
;
if
(
dotIndex
==
2
){
memmove
(
sql
,
ch
+
1
,
strlen
(
ch
+
1
)
+
1
);
break
;
}
}
ch
++
;
}
}
static
int32_t
taosDeleteData
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
)
{
SDeleteRes
req
=
{
0
};
SDecoder
coder
=
{
0
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
// decode and process req
void
*
data
=
POINTER_SHIFT
(
meta
,
sizeof
(
SMsgHead
));
int32_t
len
=
metaLen
-
sizeof
(
SMsgHead
);
tDecoderInit
(
&
coder
,
data
,
len
);
if
(
tDecodeDeleteRes
(
&
coder
,
&
req
)
<
0
)
{
code
=
TSDB_CODE_INVALID_PARA
;
goto
end
;
}
getTbName
(
req
.
tableFName
);
char
sql
[
256
]
=
{
0
};
sprintf
(
sql
,
"delete from `%s` where `%s` >= %"
PRId64
" and `%s` <= %"
PRId64
,
req
.
tableFName
,
"ts"
,
req
.
skey
,
"ts"
,
req
.
ekey
);
printf
(
"delete sql:%s
\n
"
,
sql
);
TAOS_RES
*
res
=
taos_query
(
taos
,
sql
);
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
code
=
pRequest
->
code
;
if
(
code
==
TSDB_CODE_PAR_TABLE_NOT_EXIST
)
{
code
=
TSDB_CODE_SUCCESS
;
}
taos_free_result
(
res
);
end:
tDecoderClear
(
&
coder
);
return
code
;
}
static
int32_t
taosAlterTable
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
)
{
SVAlterTbReq
req
=
{
0
};
SDecoder
coder
=
{
0
};
...
...
@@ -2914,15 +2972,21 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
goto
end
;
}
launchQueryImpl
(
pRequest
,
pQuery
,
fals
e
,
NULL
);
pQuery
=
NULL
;
// no need to free in the end
launchQueryImpl
(
pRequest
,
pQuery
,
tru
e
,
NULL
);
pVgData
=
NULL
;
pArray
=
NULL
;
code
=
pRequest
->
code
;
if
(
code
==
TSDB_CODE_VND_TABLE_NOT_EXIST
)
{
code
=
0
;
code
=
TSDB_CODE_SUCCESS
;
}
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
){
SExecResult
*
pRes
=
&
pRequest
->
body
.
resInfo
.
execRes
;
if
(
pRes
->
res
!=
NULL
){
code
=
handleAlterTbExecRes
(
pRes
->
res
,
pCatalog
);
}
}
end:
taosArrayDestroy
(
pArray
);
if
(
pVgData
)
taosMemoryFreeClear
(
pVgData
->
pData
);
...
...
@@ -2933,27 +2997,6 @@ end:
return
code
;
}
int32_t
taos_write_raw_meta
(
TAOS
*
taos
,
tmq_raw_data
raw_meta
){
if
(
!
taos
)
{
return
TSDB_CODE_INVALID_PARA
;
}
if
(
raw_meta
.
raw_meta_type
==
TDMT_VND_CREATE_STB
)
{
return
taosCreateStb
(
taos
,
raw_meta
.
raw_meta
,
raw_meta
.
raw_meta_len
);
}
else
if
(
raw_meta
.
raw_meta_type
==
TDMT_VND_ALTER_STB
){
return
taosCreateStb
(
taos
,
raw_meta
.
raw_meta
,
raw_meta
.
raw_meta_len
);
}
else
if
(
raw_meta
.
raw_meta_type
==
TDMT_VND_DROP_STB
){
return
taosDropStb
(
taos
,
raw_meta
.
raw_meta
,
raw_meta
.
raw_meta_len
);
}
else
if
(
raw_meta
.
raw_meta_type
==
TDMT_VND_CREATE_TABLE
){
return
taosCreateTable
(
taos
,
raw_meta
.
raw_meta
,
raw_meta
.
raw_meta_len
);
}
else
if
(
raw_meta
.
raw_meta_type
==
TDMT_VND_ALTER_TABLE
){
return
taosAlterTable
(
taos
,
raw_meta
.
raw_meta
,
raw_meta
.
raw_meta_len
);
}
else
if
(
raw_meta
.
raw_meta_type
==
TDMT_VND_DROP_TABLE
){
return
taosDropTable
(
taos
,
raw_meta
.
raw_meta
,
raw_meta
.
raw_meta_len
);
}
return
TSDB_CODE_INVALID_PARA
;
}
typedef
struct
{
SVgroupInfo
vg
;
void
*
data
;
...
...
@@ -2964,15 +3007,196 @@ static void destroyVgHash(void* data) {
taosMemoryFreeClear
(
vgData
->
data
);
}
int32_t
taos_write_raw_data
(
TAOS
*
taos
,
TAOS_RES
*
msg
){
if
(
!
TD_RES_TMQ
(
msg
))
{
uError
(
"WriteRaw:msg is not tmq : %d"
,
*
(
int8_t
*
)
msg
);
return
TSDB_CODE_TMQ_INVALID_MSG
;
int
taos_write_raw_block
(
TAOS
*
taos
,
int
rows
,
char
*
pData
,
const
char
*
tbname
){
int32_t
code
=
TSDB_CODE_SUCCESS
;
STableMeta
*
pTableMeta
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
createRequest
(
*
(
int64_t
*
)
taos
,
TSDB_SQL_INSERT
);
if
(
!
pRequest
){
uError
(
"WriteRaw:createRequest error request is null"
);
code
=
terrno
;
goto
end
;
}
if
(
!
pRequest
->
pDb
)
{
uError
(
"WriteRaw:not use db"
);
code
=
TSDB_CODE_PAR_DB_NOT_SPECIFIED
;
goto
end
;
}
SName
pName
=
{
TSDB_TABLE_NAME_T
,
pRequest
->
pTscObj
->
acctId
,
{
0
},
{
0
}};
strcpy
(
pName
.
dbname
,
pRequest
->
pDb
);
strcpy
(
pName
.
tname
,
tbname
);
struct
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
){
uError
(
"WriteRaw: get gatlog error"
);
goto
end
;
}
SRequestConnInfo
conn
=
{
0
};
conn
.
pTrans
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
;
conn
.
requestId
=
pRequest
->
requestId
;
conn
.
requestObjRefId
=
pRequest
->
self
;
conn
.
mgmtEps
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
);
SVgroupInfo
vgData
=
{
0
};
code
=
catalogGetTableHashVgroup
(
pCatalog
,
&
conn
,
&
pName
,
&
vgData
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"WriteRaw:catalogGetTableHashVgroup failed. table name: %s"
,
tbname
);
goto
end
;
}
code
=
catalogGetTableMeta
(
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"WriteRaw:catalogGetTableMeta failed. table name: %s"
,
tbname
);
goto
end
;
}
uint64_t
suid
=
(
TSDB_NORMAL_TABLE
==
pTableMeta
->
tableType
?
0
:
pTableMeta
->
suid
);
uint64_t
uid
=
pTableMeta
->
uid
;
int32_t
numOfCols
=
pTableMeta
->
tableInfo
.
numOfColumns
;
uint16_t
fLen
=
0
;
int32_t
rowSize
=
0
;
int16_t
nVar
=
0
;
for
(
int
i
=
0
;
i
<
numOfCols
;
i
++
)
{
SSchema
*
schema
=
pTableMeta
->
schema
+
i
;
fLen
+=
TYPE_BYTES
[
schema
->
type
];
rowSize
+=
schema
->
bytes
;
if
(
IS_VAR_DATA_TYPE
(
schema
->
type
)){
nVar
++
;
}
}
int32_t
extendedRowSize
=
rowSize
+
TD_ROW_HEAD_LEN
-
sizeof
(
TSKEY
)
+
nVar
*
sizeof
(
VarDataOffsetT
)
+
(
int32_t
)
TD_BITMAP_BYTES
(
numOfCols
-
1
);
int32_t
schemaLen
=
0
;
int32_t
submitLen
=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
rows
*
extendedRowSize
;
int32_t
totalLen
=
sizeof
(
SSubmitReq
)
+
submitLen
;
SSubmitReq
*
subReq
=
taosMemoryCalloc
(
1
,
totalLen
);
SSubmitBlk
*
blk
=
POINTER_SHIFT
(
subReq
,
sizeof
(
SSubmitReq
));
void
*
blkSchema
=
POINTER_SHIFT
(
blk
,
sizeof
(
SSubmitBlk
));
STSRow
*
rowData
=
POINTER_SHIFT
(
blkSchema
,
schemaLen
);
SRowBuilder
rb
=
{
0
};
tdSRowInit
(
&
rb
,
pTableMeta
->
sversion
);
tdSRowSetTpInfo
(
&
rb
,
numOfCols
,
fLen
);
int32_t
dataLen
=
0
;
char
*
pStart
=
pData
+
sizeof
(
int32_t
)
+
sizeof
(
uint64_t
)
+
numOfCols
*
(
sizeof
(
int16_t
)
+
sizeof
(
int32_t
));
int32_t
*
colLength
=
(
int32_t
*
)
pStart
;
pStart
+=
sizeof
(
int32_t
)
*
numOfCols
;
SResultColumn
*
pCol
=
taosMemoryCalloc
(
numOfCols
,
sizeof
(
SResultColumn
));
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
if
(
IS_VAR_DATA_TYPE
(
pTableMeta
->
schema
[
i
].
type
))
{
pCol
[
i
].
offset
=
(
int32_t
*
)
pStart
;
pStart
+=
rows
*
sizeof
(
int32_t
);
}
else
{
pCol
[
i
].
nullbitmap
=
pStart
;
pStart
+=
BitmapLen
(
rows
);
}
pCol
[
i
].
pData
=
pStart
;
pStart
+=
colLength
[
i
];
}
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
tdSRowResetBuf
(
&
rb
,
rowData
);
int32_t
offset
=
0
;
for
(
int32_t
k
=
0
;
k
<
numOfCols
;
k
++
)
{
const
SSchema
*
pColumn
=
&
pTableMeta
->
schema
[
k
];
if
(
IS_VAR_DATA_TYPE
(
pColumn
->
type
))
{
if
(
pCol
[
k
].
offset
[
j
]
!=
-
1
)
{
char
*
data
=
pCol
[
k
].
pData
+
pCol
[
k
].
offset
[
j
];
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
data
,
true
,
offset
,
k
);
}
else
{
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NULL
,
NULL
,
false
,
offset
,
k
);
}
}
else
{
if
(
!
colDataIsNull_f
(
pCol
[
k
].
nullbitmap
,
j
))
{
char
*
data
=
pCol
[
k
].
pData
+
pColumn
->
bytes
*
j
;
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
data
,
true
,
offset
,
k
);
}
else
{
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NULL
,
NULL
,
false
,
offset
,
k
);
}
}
offset
+=
TYPE_BYTES
[
pColumn
->
type
];
}
int32_t
rowLen
=
TD_ROW_LEN
(
rowData
);
rowData
=
POINTER_SHIFT
(
rowData
,
rowLen
);
dataLen
+=
rowLen
;
}
taosMemoryFree
(
pCol
);
blk
->
uid
=
htobe64
(
uid
);
blk
->
suid
=
htobe64
(
suid
);
blk
->
padding
=
htonl
(
blk
->
padding
);
blk
->
sversion
=
htonl
(
pTableMeta
->
sversion
);
blk
->
schemaLen
=
htonl
(
schemaLen
);
blk
->
numOfRows
=
htons
(
rows
);
blk
->
dataLen
=
htonl
(
dataLen
);
subReq
->
length
=
sizeof
(
SSubmitReq
)
+
sizeof
(
SSubmitBlk
)
+
schemaLen
+
dataLen
;
subReq
->
numOfBlocks
=
1
;
pQuery
=
(
SQuery
*
)
nodesMakeNode
(
QUERY_NODE_QUERY
);
if
(
NULL
==
pQuery
)
{
uError
(
"create SQuery error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
pQuery
->
execMode
=
QUERY_EXEC_MODE_SCHEDULE
;
pQuery
->
haveResultSet
=
false
;
pQuery
->
msgType
=
TDMT_VND_SUBMIT
;
pQuery
->
pRoot
=
(
SNode
*
)
nodesMakeNode
(
QUERY_NODE_VNODE_MODIF_STMT
);
if
(
NULL
==
pQuery
->
pRoot
)
{
uError
(
"create pQuery->pRoot error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
SVnodeModifOpStmt
*
nodeStmt
=
(
SVnodeModifOpStmt
*
)(
pQuery
->
pRoot
);
nodeStmt
->
payloadType
=
PAYLOAD_TYPE_KV
;
nodeStmt
->
pDataBlocks
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SVgDataBlocks
*
dst
=
taosMemoryCalloc
(
1
,
sizeof
(
SVgDataBlocks
));
if
(
NULL
==
dst
)
{
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
end
;
}
dst
->
vg
=
vgData
;
dst
->
numOfTables
=
subReq
->
numOfBlocks
;
dst
->
size
=
subReq
->
length
;
dst
->
pData
=
(
char
*
)
subReq
;
subReq
->
header
.
vgId
=
htonl
(
dst
->
vg
.
vgId
);
subReq
->
version
=
htonl
(
1
);
subReq
->
header
.
contLen
=
htonl
(
subReq
->
length
);
subReq
->
length
=
htonl
(
subReq
->
length
);
subReq
->
numOfBlocks
=
htonl
(
subReq
->
numOfBlocks
);
subReq
=
NULL
;
// no need free
taosArrayPush
(
nodeStmt
->
pDataBlocks
,
&
dst
);
launchQueryImpl
(
pRequest
,
pQuery
,
true
,
NULL
);
code
=
pRequest
->
code
;
end:
taosMemoryFreeClear
(
pTableMeta
);
qDestroyQuery
(
pQuery
);
return
code
;
}
static
int32_t
tmqWriteRaw
(
TAOS
*
taos
,
void
*
data
,
int32_t
dataLen
){
int32_t
code
=
TSDB_CODE_SUCCESS
;
SHashObj
*
pVgHash
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SMqRspObj
rspObj
=
{
0
};
SDecoder
decoder
=
{
0
};
terrno
=
TSDB_CODE_SUCCESS
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
createRequest
(
*
(
int64_t
*
)
taos
,
TSDB_SQL_INSERT
);
...
...
@@ -2981,6 +3205,17 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){
return
terrno
;
}
rspObj
.
resIter
=
-
1
;
rspObj
.
resType
=
RES_TYPE__TMQ
;
tDecoderInit
(
&
decoder
,
data
,
dataLen
);
code
=
tDecodeSMqDataRsp
(
&
decoder
,
&
rspObj
.
rsp
);
if
(
code
!=
0
){
uError
(
"WriteRaw:decode smqDataRsp error"
);
code
=
TSDB_CODE_INVALID_MSG
;
goto
end
;
}
if
(
!
pRequest
->
pDb
)
{
uError
(
"WriteRaw:not use db"
);
code
=
TSDB_CODE_PAR_DB_NOT_SPECIFIED
;
...
...
@@ -3001,18 +3236,18 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){
conn
.
requestId
=
pRequest
->
requestId
;
conn
.
requestObjRefId
=
pRequest
->
self
;
conn
.
mgmtEps
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
);
SMqRspObj
*
rspObj
=
((
SMqRspObj
*
)
msg
);
printf
(
"raw data block num:%d
\n
"
,
rspObj
->
rsp
.
blockNum
);
while
(
++
rspObj
->
resIter
<
rspObj
->
rsp
.
blockNum
)
{
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
taosArrayGetP
(
rspObj
->
rsp
.
blockData
,
rspObj
->
resIter
);
if
(
!
rspObj
->
rsp
.
withSchema
)
{
uError
(
"WriteRaw:no schema, iter:%d"
,
rspObj
->
resIter
);
printf
(
"raw data block num:%d
\n
"
,
rspObj
.
rsp
.
blockNum
);
while
(
++
rspObj
.
resIter
<
rspObj
.
rsp
.
blockNum
)
{
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
taosArrayGetP
(
rspObj
.
rsp
.
blockData
,
rspObj
.
resIter
);
if
(
!
rspObj
.
rsp
.
withSchema
)
{
uError
(
"WriteRaw:no schema, iter:%d"
,
rspObj
.
resIter
);
goto
end
;
}
SSchemaWrapper
*
pSW
=
(
SSchemaWrapper
*
)
taosArrayGetP
(
rspObj
->
rsp
.
blockSchema
,
rspObj
->
resIter
);
setResSchemaInfo
(
&
rspObj
->
resInfo
,
pSW
->
pSchema
,
pSW
->
nCols
);
SSchemaWrapper
*
pSW
=
(
SSchemaWrapper
*
)
taosArrayGetP
(
rspObj
.
rsp
.
blockSchema
,
rspObj
.
resIter
);
setResSchemaInfo
(
&
rspObj
.
resInfo
,
pSW
->
pSchema
,
pSW
->
nCols
);
code
=
setQueryResultFromRsp
(
&
rspObj
->
resInfo
,
pRetrieve
,
false
,
false
);
code
=
setQueryResultFromRsp
(
&
rspObj
.
resInfo
,
pRetrieve
,
false
,
false
);
if
(
code
!=
TSDB_CODE_SUCCESS
){
uError
(
"WriteRaw: setQueryResultFromRsp error"
);
goto
end
;
...
...
@@ -3030,13 +3265,13 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){
}
}
int32_t
rows
=
rspObj
->
resInfo
.
numOfRows
;
int32_t
rows
=
rspObj
.
resInfo
.
numOfRows
;
int32_t
extendedRowSize
=
rowSize
+
TD_ROW_HEAD_LEN
-
sizeof
(
TSKEY
)
+
nVar
*
sizeof
(
VarDataOffsetT
)
+
(
int32_t
)
TD_BITMAP_BYTES
(
pSW
->
nCols
-
1
);
int32_t
schemaLen
=
0
;
int32_t
submitLen
=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
rows
*
extendedRowSize
;
const
char
*
tbName
=
tmq_get_table_name
(
msg
);
const
char
*
tbName
=
(
const
char
*
)
taosArrayGetP
(
rspObj
.
rsp
.
blockTbName
,
rspObj
.
resIter
);
if
(
!
tbName
){
uError
(
"WriteRaw: tbname is null"
);
code
=
TSDB_CODE_TMQ_INVALID_MSG
;
...
...
@@ -3108,13 +3343,13 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
tdSRowResetBuf
(
&
rb
,
rowData
);
doSetOneRowPtr
(
&
rspObj
->
resInfo
);
rspObj
->
resInfo
.
current
+=
1
;
doSetOneRowPtr
(
&
rspObj
.
resInfo
);
rspObj
.
resInfo
.
current
+=
1
;
int32_t
offset
=
0
;
for
(
int32_t
k
=
0
;
k
<
pSW
->
nCols
;
k
++
)
{
const
SSchema
*
pColumn
=
&
pSW
->
pSchema
[
k
];
char
*
data
=
rspObj
->
resInfo
.
row
[
k
];
char
*
data
=
rspObj
.
resInfo
.
row
[
k
];
if
(
!
data
)
{
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NULL
,
NULL
,
false
,
offset
,
k
);
}
else
{
...
...
@@ -3186,13 +3421,105 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){
launchQueryImpl
(
pRequest
,
pQuery
,
true
,
NULL
);
code
=
pRequest
->
code
;
end:
tDecoderClear
(
&
decoder
);
taos_free_result
(
&
rspObj
);
qDestroyQuery
(
pQuery
);
destroyRequest
(
pRequest
);
taosHashCleanup
(
pVgHash
);
return
code
;
}
char
*
tmq_get_json_meta
(
TAOS_RES
*
res
)
{
if
(
!
TD_RES_TMQ_META
(
res
))
{
return
NULL
;
}
SMqMetaRspObj
*
pMetaRspObj
=
(
SMqMetaRspObj
*
)
res
;
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_CREATE_STB
)
{
return
processCreateStb
(
&
pMetaRspObj
->
metaRsp
);
}
else
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_ALTER_STB
)
{
return
processAlterStb
(
&
pMetaRspObj
->
metaRsp
);
}
else
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_DROP_STB
)
{
return
processDropSTable
(
&
pMetaRspObj
->
metaRsp
);
}
else
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_CREATE_TABLE
)
{
return
processCreateTable
(
&
pMetaRspObj
->
metaRsp
);
}
else
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_ALTER_TABLE
)
{
return
processAlterTable
(
&
pMetaRspObj
->
metaRsp
);
}
else
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_DROP_TABLE
)
{
return
processDropTable
(
&
pMetaRspObj
->
metaRsp
);
}
return
NULL
;
}
void
tmq_free_json_meta
(
char
*
jsonMeta
)
{
taosMemoryFreeClear
(
jsonMeta
);
}
int32_t
tmq_get_raw
(
TAOS_RES
*
res
,
tmq_raw_data
*
raw
)
{
if
(
!
raw
||
!
res
){
return
TSDB_CODE_INVALID_PARA
;
}
if
(
TD_RES_TMQ_META
(
res
))
{
SMqMetaRspObj
*
pMetaRspObj
=
(
SMqMetaRspObj
*
)
res
;
raw
->
raw
=
pMetaRspObj
->
metaRsp
.
metaRsp
;
raw
->
raw_len
=
pMetaRspObj
->
metaRsp
.
metaRspLen
;
raw
->
raw_type
=
pMetaRspObj
->
metaRsp
.
resMsgType
;
}
else
if
(
TD_RES_TMQ
(
res
)){
SMqRspObj
*
rspObj
=
((
SMqRspObj
*
)
res
);
int32_t
len
=
0
;
int32_t
code
=
0
;
tEncodeSize
(
tEncodeSMqDataRsp
,
&
rspObj
->
rsp
,
len
,
code
);
if
(
code
<
0
)
{
return
-
1
;
}
void
*
buf
=
taosMemoryCalloc
(
1
,
len
);
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
len
);
tEncodeSMqDataRsp
(
&
encoder
,
&
rspObj
->
rsp
);
tEncoderClear
(
&
encoder
);
raw
->
raw
=
buf
;
raw
->
raw_len
=
len
;
raw
->
raw_type
=
RES_TYPE__TMQ
;
}
else
{
return
TSDB_CODE_TMQ_INVALID_MSG
;
}
return
TSDB_CODE_SUCCESS
;
}
void
tmq_free_raw
(
tmq_raw_data
raw
)
{
if
(
raw
.
raw_type
==
RES_TYPE__TMQ
){
taosMemoryFree
(
raw
.
raw
);
}
}
int32_t
tmq_write_raw
(
TAOS
*
taos
,
tmq_raw_data
raw
){
if
(
!
taos
)
{
return
TSDB_CODE_INVALID_PARA
;
}
if
(
raw
.
raw_type
==
TDMT_VND_CREATE_STB
)
{
return
taosCreateStb
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
else
if
(
raw
.
raw_type
==
TDMT_VND_ALTER_STB
){
return
taosCreateStb
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
else
if
(
raw
.
raw_type
==
TDMT_VND_DROP_STB
){
return
taosDropStb
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
else
if
(
raw
.
raw_type
==
TDMT_VND_CREATE_TABLE
){
return
taosCreateTable
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
else
if
(
raw
.
raw_type
==
TDMT_VND_ALTER_TABLE
){
return
taosAlterTable
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
else
if
(
raw
.
raw_type
==
TDMT_VND_DROP_TABLE
)
{
return
taosDropTable
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
else
if
(
raw
.
raw_type
==
TDMT_VND_DELETE
){
return
taosDeleteData
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
else
if
(
raw
.
raw_type
==
RES_TYPE__TMQ
){
return
tmqWriteRaw
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
return
TSDB_CODE_INVALID_PARA
;
}
void
tmq_commit_async
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
,
tmq_commit_cb
*
cb
,
void
*
param
)
{
//
tmqCommitInner2
(
tmq
,
msg
,
0
,
1
,
cb
,
param
);
...
...
source/common/src/tmsg.c
浏览文件 @
9a930030
...
...
@@ -5681,6 +5681,7 @@ int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
if
(
tEncodeI64
(
pCoder
,
pRes
->
ekey
)
<
0
)
return
-
1
;
if
(
tEncodeI64v
(
pCoder
,
pRes
->
affectedRows
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pCoder
,
pRes
->
tableFName
)
<
0
)
return
-
1
;
return
0
;
}
...
...
@@ -5692,12 +5693,13 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
if
(
tDecodeI32v
(
pCoder
,
&
nUid
)
<
0
)
return
-
1
;
for
(
int32_t
iUid
=
0
;
iUid
<
nUid
;
iUid
++
)
{
if
(
tDecodeU64
(
pCoder
,
&
uid
)
<
0
)
return
-
1
;
taosArrayPush
(
pRes
->
uidList
,
&
uid
);
if
(
pRes
->
uidList
)
taosArrayPush
(
pRes
->
uidList
,
&
uid
);
}
if
(
tDecodeI64
(
pCoder
,
&
pRes
->
skey
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pRes
->
ekey
)
<
0
)
return
-
1
;
if
(
tDecodeI64v
(
pCoder
,
&
pRes
->
affectedRows
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pCoder
,
pRes
->
tableFName
)
<
0
)
return
-
1
;
return
0
;
}
int32_t
tEncodeSMqDataRsp
(
SEncoder
*
pEncoder
,
const
SMqDataRsp
*
pRsp
)
{
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
9a930030
...
...
@@ -146,8 +146,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
}
}
int32_t
len
;
int32_t
code
;
int32_t
len
=
0
;
int32_t
code
=
0
;
tEncodeSize
(
tEncodeSMqDataRsp
,
pRsp
,
len
,
code
);
if
(
code
<
0
)
{
return
-
1
;
...
...
@@ -164,9 +164,10 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMqRspHead
));
SEncoder
encoder
;
SEncoder
encoder
=
{
0
}
;
tEncoderInit
(
&
encoder
,
abuf
,
len
);
tEncodeSMqDataRsp
(
&
encoder
,
pRsp
);
tEncoderClear
(
&
encoder
);
SRpcMsg
rsp
=
{
.
info
=
pMsg
->
info
,
...
...
@@ -176,8 +177,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
};
tmsgSendRsp
(
&
rsp
);
char
buf1
[
80
];
char
buf2
[
80
];
char
buf1
[
80
]
=
{
0
}
;
char
buf2
[
80
]
=
{
0
}
;
tFormatOffset
(
buf1
,
80
,
&
pRsp
->
reqOffset
);
tFormatOffset
(
buf2
,
80
,
&
pRsp
->
rspOffset
);
tqDebug
(
"vgId:%d from consumer:%"
PRId64
", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s"
,
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
9a930030
...
...
@@ -106,7 +106,9 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
.
meta
=
pVnode
->
pMeta
,
.
config
=
&
pVnode
->
config
,
.
vnode
=
pVnode
,
.
pMsgCb
=
&
pVnode
->
msgCb
};
code
=
qWorkerProcessDeleteMsg
(
&
handle
,
pVnode
->
pQuery
,
pMsg
,
&
res
);
if
(
code
)
goto
_err
;
if
(
code
)
{
goto
_err
;
}
// malloc and encode
tEncodeSize
(
tEncodeDeleteRes
,
&
res
,
size
,
ret
);
...
...
@@ -993,6 +995,11 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
SDecoder
*
pCoder
=
&
(
SDecoder
){
0
};
SDeleteRes
*
pRes
=
&
(
SDeleteRes
){
0
};
pRsp
->
msgType
=
TDMT_VND_DELETE_RSP
;
pRsp
->
pCont
=
NULL
;
pRsp
->
contLen
=
0
;
pRsp
->
code
=
TSDB_CODE_SUCCESS
;
pRes
->
uidList
=
taosArrayInit
(
0
,
sizeof
(
tb_uid_t
));
if
(
pRes
->
uidList
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -1010,6 +1017,15 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
tDecoderClear
(
pCoder
);
taosArrayDestroy
(
pRes
->
uidList
);
SVDeleteRsp
rsp
=
{.
affectedRows
=
pRes
->
affectedRows
};
int32_t
ret
=
0
;
tEncodeSize
(
tEncodeSVDeleteRsp
,
&
rsp
,
pRsp
->
contLen
,
ret
);
pRsp
->
pCont
=
rpcMallocCont
(
pRsp
->
contLen
);
SEncoder
ec
=
{
0
};
tEncoderInit
(
&
ec
,
pRsp
->
pCont
,
pRsp
->
contLen
);
tEncodeSVDeleteRsp
(
&
ec
,
&
rsp
);
tEncoderClear
(
&
ec
);
return
code
;
_err:
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
9a930030
...
...
@@ -1021,6 +1021,7 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol
int32_t
generateGroupIdMap
(
STableListInfo
*
pTableListInfo
,
SReadHandle
*
pHandle
,
SNodeList
*
groupKey
);
SSDataBlock
*
createSpecialDataBlock
(
EStreamType
type
);
void
*
destroySqlFunctionCtx
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
#ifdef __cplusplus
}
...
...
source/libs/executor/src/dataDeleter.c
浏览文件 @
9a930030
...
...
@@ -90,6 +90,7 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
pRes
->
uidList
=
pHandle
->
pParam
->
pUidList
;
pRes
->
skey
=
pHandle
->
pDeleter
->
deleteTimeRange
.
skey
;
pRes
->
ekey
=
pHandle
->
pDeleter
->
deleteTimeRange
.
ekey
;
strcpy
(
pRes
->
tableFName
,
pHandle
->
pDeleter
->
tableFName
);
pRes
->
affectedRows
=
*
(
int64_t
*
)
pColRes
->
pData
;
pBuf
->
useSize
+=
pEntry
->
dataLen
;
...
...
source/libs/executor/src/executor.c
浏览文件 @
9a930030
...
...
@@ -347,6 +347,9 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
}
code
=
dsCreateDataSinker
(
pSubplan
->
pDataSink
,
handle
,
pSinkParam
);
if
(
code
!=
TSDB_CODE_SUCCESS
){
taosMemoryFreeClear
(
pSinkParam
);
}
}
_error:
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
9a930030
...
...
@@ -3436,7 +3436,7 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
initResultRowInfo
(
&
pInfo
->
resultRowInfo
);
}
static
void
*
destroySqlFunctionCtx
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
void
*
destroySqlFunctionCtx
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
if
(
pCtx
==
NULL
)
{
return
NULL
;
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
9a930030
...
...
@@ -3147,6 +3147,8 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
blockDataDestroy
(
pInfo
->
pDelRes
);
blockDataDestroy
(
pInfo
->
pWinBlock
);
blockDataDestroy
(
pInfo
->
pUpdateRes
);
destroySqlFunctionCtx
(
pInfo
->
pDummyCtx
,
0
);
taosHashCleanup
(
pInfo
->
pStDeleted
);
taosMemoryFreeClear
(
param
);
}
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
9a930030
...
...
@@ -283,7 +283,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
pRes
->
skey
=
pDelRes
->
skey
;
pRes
->
ekey
=
pDelRes
->
ekey
;
pRes
->
affectedRows
=
pDelRes
->
affectedRows
;
strcpy
(
pRes
->
tableFName
,
pDelRes
->
tableFName
);
taosMemoryFree
(
output
.
pData
);
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
9a930030
...
...
@@ -230,6 +230,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SVDeleteRsp
rsp
=
{
0
};
tDecoderInit
(
&
coder
,
msg
,
msgSize
);
tDecodeSVDeleteRsp
(
&
coder
,
&
rsp
);
tDecoderClear
(
&
coder
);
atomic_add_fetch_32
(
&
pJob
->
resNumOfRows
,
rsp
.
affectedRows
);
SCH_TASK_DLOG
(
"delete succeed, affectedRows:%"
PRId64
,
rsp
.
affectedRows
);
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
9a930030
...
...
@@ -2501,19 +2501,15 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
// if mulit replica, start replicate right now
if
(
ths
->
replicaNum
>
1
)
{
syncNodeReplicate
(
ths
);
}
// pre commit
syncNodePreCommit
(
ths
,
pEntry
,
0
);
// pre commit
syncNodePreCommit
(
ths
,
pEntry
,
0
);
}
// if only myself, maybe commit right now
if
(
ths
->
replicaNum
==
1
)
{
syncMaybeAdvanceCommitIndex
(
ths
);
}
}
else
{
// pre commit
syncNodePreCommit
(
ths
,
pEntry
,
0
);
}
if
(
pRetIndex
!=
NULL
)
{
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
9a930030
...
...
@@ -7,8 +7,7 @@
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* FITNESS FOR A PARTICULAR PURPOSE. *
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
...
...
@@ -211,6 +210,7 @@ typedef struct SConnBuffer {
char
*
buf
;
int
len
;
int
cap
;
int
left
;
int
total
;
}
SConnBuffer
;
...
...
@@ -282,6 +282,8 @@ int transClearBuffer(SConnBuffer* buf);
int
transDestroyBuffer
(
SConnBuffer
*
buf
);
int
transAllocBuffer
(
SConnBuffer
*
connBuf
,
uv_buf_t
*
uvBuf
);
bool
transReadComplete
(
SConnBuffer
*
connBuf
);
int
transResetBuffer
(
SConnBuffer
*
connBuf
);
int
transDumpFromBuffer
(
SConnBuffer
*
connBuf
,
char
**
buf
);
int
transSetConnOption
(
uv_tcp_t
*
stream
);
...
...
source/libs/transport/src/thttp.c
浏览文件 @
9a930030
...
...
@@ -17,6 +17,7 @@
#ifdef USE_UV
#include <uv.h>
#endif
// clang-format off
#include "zlib.h"
#include "thttp.h"
#include "taoserror.h"
...
...
@@ -174,7 +175,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
#else
int32_t
taosSendHttpReport
(
const
char
*
server
,
uint16_t
port
,
char
*
pCont
,
int32_t
contLen
,
EHttpCompFlag
flag
)
{
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
TdSocketPtr
pSocket
=
NULL
;
uint32_t
ip
=
taosGetIpv4FromFqdn
(
server
);
...
...
@@ -231,4 +232,5 @@ SEND_OVER:
return
code
;
}
#endif
\ No newline at end of file
// clang-format on
#endif
source/libs/transport/src/transCli.c
浏览文件 @
9a930030
...
...
@@ -323,7 +323,8 @@ void cliHandleResp(SCliConn* conn) {
SCliThrd
*
pThrd
=
conn
->
hostThrd
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
STransMsgHead
*
pHead
=
(
STransMsgHead
*
)(
conn
->
readBuf
.
buf
);
STransMsgHead
*
pHead
=
NULL
;
transDumpFromBuffer
(
&
conn
->
readBuf
,
(
char
**
)
&
pHead
);
pHead
->
code
=
htonl
(
pHead
->
code
);
pHead
->
msgLen
=
htonl
(
pHead
->
msgLen
);
...
...
@@ -366,7 +367,6 @@ void cliHandleResp(SCliConn* conn) {
}
}
// buf's mem alread translated to transMsg.pCont
transClearBuffer
(
&
conn
->
readBuf
);
if
(
!
CONN_NO_PERSIST_BY_APP
(
conn
))
{
transMsg
.
info
.
handle
=
(
void
*
)
conn
->
refId
;
tDebug
(
"%s conn %p ref by app"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
...
...
@@ -636,6 +636,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
transReqQueueInit
(
&
conn
->
wreqQueue
);
transQueueInit
(
&
conn
->
cliMsgs
,
NULL
);
transInitBuffer
(
&
conn
->
readBuf
);
QUEUE_INIT
(
&
conn
->
q
);
conn
->
hostThrd
=
pThrd
;
conn
->
status
=
ConnNormal
;
...
...
@@ -651,8 +653,9 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
QUEUE_REMOVE
(
&
conn
->
q
);
QUEUE_INIT
(
&
conn
->
q
);
transRemoveExHandle
(
transGetRefMgt
(),
conn
->
refId
);
conn
->
refId
=
-
1
;
transDestroyBuffer
(
&
conn
->
readBuf
)
;
conn
->
refId
=
-
1
;
if
(
conn
->
task
!=
NULL
)
transDQCancel
(((
SCliThrd
*
)
conn
->
hostThrd
)
->
timeoutQueue
,
conn
->
task
);
if
(
clear
)
{
...
...
@@ -678,7 +681,6 @@ static void cliDestroy(uv_handle_t* handle) {
tTrace
(
"%s conn %p destroy successfully"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
transReqQueueClear
(
&
conn
->
wreqQueue
);
transDestroyBuffer
(
&
conn
->
readBuf
);
taosMemoryFree
(
conn
);
}
static
bool
cliHandleNoResp
(
SCliConn
*
conn
)
{
...
...
source/libs/transport/src/transComm.c
浏览文件 @
9a930030
...
...
@@ -16,6 +16,8 @@
#include "transComm.h"
#define BUFFER_CAP 4096
static
TdThreadOnce
transModuleInit
=
PTHREAD_ONCE_INIT
;
static
int32_t
refMgt
;
...
...
@@ -111,12 +113,56 @@ int transGetSockDebugInfo(struct sockaddr* sockname, char* dst) {
return
r
;
}
int
transInitBuffer
(
SConnBuffer
*
buf
)
{
transClearBuffer
(
buf
);
buf
->
cap
=
BUFFER_CAP
;
buf
->
buf
=
taosMemoryCalloc
(
1
,
BUFFER_CAP
);
buf
->
left
=
-
1
;
buf
->
len
=
0
;
buf
->
total
=
0
;
return
0
;
}
int
transDestroyBuffer
(
SConnBuffer
*
buf
)
{
taosMemoryFree
(
buf
->
buf
);
return
0
;
}
int
transClearBuffer
(
SConnBuffer
*
buf
)
{
memset
(
buf
,
0
,
sizeof
(
*
buf
));
buf
->
total
=
-
1
;
SConnBuffer
*
p
=
buf
;
if
(
p
->
cap
>
BUFFER_CAP
)
{
p
->
cap
=
BUFFER_CAP
;
p
->
buf
=
taosMemoryRealloc
(
p
->
buf
,
BUFFER_CAP
);
}
p
->
left
=
-
1
;
p
->
len
=
0
;
p
->
total
=
0
;
return
0
;
}
int
transDumpFromBuffer
(
SConnBuffer
*
connBuf
,
char
**
buf
)
{
SConnBuffer
*
p
=
connBuf
;
if
(
p
->
left
!=
0
)
{
return
-
1
;
}
int
total
=
connBuf
->
total
;
*
buf
=
taosMemoryCalloc
(
1
,
total
);
memcpy
(
*
buf
,
p
->
buf
,
total
);
transResetBuffer
(
connBuf
);
return
total
;
}
int
transResetBuffer
(
SConnBuffer
*
connBuf
)
{
SConnBuffer
*
p
=
connBuf
;
if
(
p
->
total
<=
p
->
len
)
{
int
left
=
p
->
len
-
p
->
total
;
memmove
(
p
->
buf
,
p
->
buf
+
p
->
total
,
left
);
p
->
left
=
-
1
;
p
->
total
=
0
;
p
->
len
=
left
;
}
else
{
p
->
left
=
-
1
;
p
->
total
=
0
;
p
->
len
=
0
;
}
return
0
;
}
int
transAllocBuffer
(
SConnBuffer
*
connBuf
,
uv_buf_t
*
uvBuf
)
{
...
...
@@ -126,54 +172,39 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
* |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
* info--->|
*/
static
const
int
CAPACITY
=
sizeof
(
STransMsgHead
);
SConnBuffer
*
p
=
connBuf
;
if
(
p
->
cap
==
0
)
{
p
->
buf
=
(
char
*
)
taosMemoryCalloc
(
CAPACITY
,
sizeof
(
char
));
tTrace
(
"internal malloc mem:%p, size:%d"
,
p
->
buf
,
CAPACITY
);
p
->
len
=
0
;
p
->
cap
=
CAPACITY
;
p
->
total
=
-
1
;
uvBuf
->
base
=
p
->
buf
;
uvBuf
->
len
=
CAPACITY
;
}
else
if
(
p
->
total
==
-
1
&&
p
->
len
<
CAPACITY
)
{
uvBuf
->
base
=
p
->
buf
+
p
->
len
;
uvBuf
->
len
=
CAPACITY
-
p
->
len
;
}
else
{
p
->
cap
=
p
->
total
;
p
->
buf
=
taosMemoryRealloc
(
p
->
buf
,
p
->
cap
);
tTrace
(
"internal realloc mem:%p, size:%d"
,
p
->
buf
,
p
->
cap
);
uvBuf
->
base
=
p
->
buf
+
p
->
len
;
uvBuf
->
base
=
p
->
buf
+
p
->
len
;
if
(
p
->
left
==
-
1
)
{
uvBuf
->
len
=
p
->
cap
-
p
->
len
;
}
else
{
if
(
p
->
left
<
p
->
cap
-
p
->
len
)
{
uvBuf
->
len
=
p
->
left
;
}
else
{
p
->
buf
=
taosMemoryRealloc
(
p
->
buf
,
p
->
left
+
p
->
len
);
uvBuf
->
base
=
p
->
buf
+
p
->
len
;
uvBuf
->
len
=
p
->
left
;
}
}
return
0
;
}
// check whether already read complete
bool
transReadComplete
(
SConnBuffer
*
connBuf
)
{
if
(
connBuf
->
total
==
-
1
&&
connBuf
->
len
>=
sizeof
(
STransMsgHead
))
{
STransMsgHead
head
;
memcpy
((
char
*
)
&
head
,
connBuf
->
buf
,
sizeof
(
head
));
int32_t
msgLen
=
(
int32_t
)
htonl
(
head
.
msgLen
);
connBuf
->
total
=
msgLen
;
}
if
(
connBuf
->
len
==
connBuf
->
cap
&&
connBuf
->
total
==
connBuf
->
cap
)
{
return
true
;
}
return
false
;
}
int
transPackMsg
(
STransMsgHead
*
msgHead
,
bool
sercured
,
bool
auth
)
{
return
0
;
}
int
transUnpackMsg
(
STransMsgHead
*
msgHead
)
{
return
0
;
}
int
transDestroyBuffer
(
SConnBuffer
*
buf
)
{
if
(
buf
->
cap
>
0
)
{
taosMemoryFreeClear
(
buf
->
buf
);
SConnBuffer
*
p
=
connBuf
;
if
(
p
->
len
>=
sizeof
(
STransMsgHead
))
{
if
(
p
->
left
==
-
1
)
{
STransMsgHead
head
;
memcpy
((
char
*
)
&
head
,
connBuf
->
buf
,
sizeof
(
head
));
int32_t
msgLen
=
(
int32_t
)
htonl
(
head
.
msgLen
);
p
->
total
=
msgLen
;
}
if
(
p
->
total
>=
p
->
len
)
{
p
->
left
=
p
->
total
-
p
->
len
;
}
else
{
p
->
left
=
0
;
}
}
transClearBuffer
(
buf
);
return
0
;
return
p
->
left
==
0
?
true
:
false
;
}
int
transSetConnOption
(
uv_tcp_t
*
stream
)
{
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
9a930030
...
...
@@ -212,9 +212,10 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
}
static
void
uvHandleReq
(
SSvrConn
*
pConn
)
{
SConnBuffer
*
pBuf
=
&
pConn
->
readBuf
;
char
*
msg
=
pBuf
->
buf
;
uint32_t
msgLen
=
pBuf
->
len
;
STransMsgHead
*
msg
=
NULL
;
int
msgLen
=
0
;
msgLen
=
transDumpFromBuffer
(
&
pConn
->
readBuf
,
(
char
**
)
&
msg
);
STransMsgHead
*
pHead
=
(
STransMsgHead
*
)
msg
;
pHead
->
code
=
htonl
(
pHead
->
code
);
...
...
@@ -761,6 +762,7 @@ static SSvrConn* createConn(void* hThrd) {
memset
(
&
pConn
->
regArg
,
0
,
sizeof
(
pConn
->
regArg
));
pConn
->
broken
=
false
;
pConn
->
status
=
ConnNormal
;
transInitBuffer
(
&
pConn
->
readBuf
);
SExHandle
*
exh
=
taosMemoryMalloc
(
sizeof
(
SExHandle
));
exh
->
handle
=
pConn
;
...
...
source/os/src/osSysinfo.c
浏览文件 @
9a930030
...
...
@@ -374,9 +374,10 @@ int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) {
size_t
size
=
0
;
int32_t
done
=
0
;
int32_t
code
=
-
1
;
float
coreCount
=
0
;
TdFilePtr
pFile
=
taosOpenFile
(
"/proc/cpuinfo"
,
TD_FILE_READ
|
TD_FILE_STREAM
);
if
(
pFile
==
NULL
)
return
fals
e
;
if
(
pFile
==
NULL
)
return
cod
e
;
while
(
done
!=
3
&&
(
size
=
taosGetLineFile
(
pFile
,
&
line
))
!=
-
1
)
{
line
[
size
-
1
]
=
'\0'
;
...
...
@@ -390,11 +391,26 @@ int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) {
*
numOfCores
=
atof
(
v
);
done
|=
2
;
}
if
(
strncmp
(
line
,
"processor"
,
9
)
==
0
)
coreCount
+=
1
;
}
if
(
line
!=
NULL
)
taosMemoryFree
(
line
);
taosCloseFile
(
&
pFile
);
if
(
code
!=
0
)
{
TdFilePtr
pFile1
=
taosOpenFile
(
"/proc/device-tree/model"
,
TD_FILE_READ
|
TD_FILE_STREAM
);
if
(
pFile1
==
NULL
)
return
code
;
taosGetsFile
(
pFile1
,
maxLen
,
cpuModel
);
taosCloseFile
(
&
pFile1
);
code
=
0
;
done
|=
1
;
}
if
((
done
&
2
)
==
0
)
{
*
numOfCores
=
coreCount
;
done
|=
2
;
}
return
code
;
#endif
}
...
...
tests/script/tsim/valgrind/checkError6.sim
浏览文件 @
9a930030
...
...
@@ -60,12 +60,13 @@ sql select top(tbcol, 2) from tb1 where ts <= 1601481840000
sql select percentile(tbcol, 2) from tb1 where ts <= 1601481840000
sql select leastsquares(tbcol, 1, 1) as b from tb1 where ts <= 1601481840000
sql show table distributed tb1
sql select count(1) from tb1
sql select count(tbcol) as b from tb1 where ts <= 1601481840000 interval(1m)
sql select diff(tbcol) from tb1 where ts <= 1601481840000
sql select diff(tbcol) from tb1 where tbcol > 5 and tbcol < 20
sql select first(tbcol), last(tbcol) as b from tb1 where ts <= 1601481840000 interval(1m)
sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), sum(tbcol), stddev(tbcol) from tb1 where ts <= 1601481840000 partition by tgcol interval(1m)
#
sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from tb1 where ts <= 1601481840000 and ts >= 1601481800000 partition by tgcol interval(1m) fill(value, 0)
sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from tb1 where ts <= 1601481840000 and ts >= 1601481800000 partition by tgcol interval(1m) fill(value, 0)
sql select last_row(*) from tb1 where tbcol > 5 and tbcol < 20
print =============== step4: stb
...
...
@@ -78,13 +79,14 @@ sql select avg(tbcol) as b from stb where ts <= 1601481840000 interval(1m)
sql select avg(tbcol) as c from stb group by tgcol
sql select avg(tbcol) as b from stb where ts <= 1601481840000 partition by tgcol interval(1m)
sql show table distributed stb
sql select count(1) from stb
sql select count(tbcol) as b from stb where ts <= 1601481840000 partition by tgcol interval(1m)
sql select diff(tbcol) from stb where ts <= 1601481840000
sql select first(tbcol), last(tbcol) as c from stb group by tgcol
sql select first(tbcol), last(tbcol) as b from stb where ts <= 1601481840000 and tbcol2 is null partition by tgcol interval(1m)
sql select first(tbcol), last(tbcol) as b from stb where ts <= 1601481840000 partition by tgcol interval(1m)
sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), sum(tbcol), stddev(tbcol) from stb where ts <= 1601481840000 partition by tgcol interval(1m)
#
sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from stb where ts <= 1601481840000 and ts >= 1601481800000 partition by tgcol interval(1m) fill(value, 0)
sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from stb where ts <= 1601481840000 and ts >= 1601481800000 partition by tgcol interval(1m) fill(value, 0)
sql select last_row(tbcol), stddev(tbcol) from stb where tbcol > 5 and tbcol < 20 group by tgcol
_OVER:
...
...
tests/system-test/2-query/cast.py
浏览文件 @
9a930030
...
...
@@ -566,8 +566,7 @@ class TDTestCase:
if
data_ct4_c10
[
i
]
is
None
:
tdSql
.
checkData
(
i
,
0
,
None
)
else
:
# time2str = str(int((data_ct4_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000))
time2str
=
str
(
int
((
datetime
.
datetime
.
timestamp
(
data_ct4_c10
[
i
])
-
datetime
.
datetime
.
timestamp
(
datetime
.
datetime
.
fromtimestamp
(
0
)))
*
1000
))
time2str
=
str
(
int
((
data_ct4_c10
[
i
]
-
datetime
.
datetime
.
fromtimestamp
(
0
)).
total_seconds
()
*
1000
))
tdSql
.
checkData
(
i
,
0
,
time2str
)
tdSql
.
query
(
f
"select cast(c10 as nchar(32)) as b from
{
self
.
dbname
}
.t1"
)
for
i
in
range
(
len
(
data_t1_c10
)):
...
...
@@ -576,8 +575,7 @@ class TDTestCase:
elif
i
==
10
:
continue
else
:
# time2str = str(int((data_t1_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000))
time2str
=
str
(
int
((
datetime
.
datetime
.
timestamp
(
data_t1_c10
[
i
])
-
datetime
.
datetime
.
timestamp
(
datetime
.
datetime
.
fromtimestamp
(
0
)))
*
1000
))
time2str
=
str
(
int
((
data_t1_c10
[
i
]
-
datetime
.
datetime
.
fromtimestamp
(
0
)).
total_seconds
()
*
1000
))
tdSql
.
checkData
(
i
,
0
,
time2str
)
tdLog
.
printNoPrefix
(
"==========step38: cast timestamp to binary, expect no changes "
)
...
...
@@ -586,8 +584,7 @@ class TDTestCase:
if
data_ct4_c10
[
i
]
is
None
:
tdSql
.
checkData
(
i
,
0
,
None
)
else
:
# time2str = str(int((data_ct4_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000))
time2str
=
str
(
int
((
datetime
.
datetime
.
timestamp
(
data_ct4_c10
[
i
])
-
datetime
.
datetime
.
timestamp
(
datetime
.
datetime
.
fromtimestamp
(
0
)))
*
1000
))
time2str
=
str
(
int
((
data_ct4_c10
[
i
]
-
datetime
.
datetime
.
fromtimestamp
(
0
)).
total_seconds
()
*
1000
))
tdSql
.
checkData
(
i
,
0
,
time2str
)
tdSql
.
query
(
f
"select cast(c10 as binary(32)) as b from
{
self
.
dbname
}
.t1"
)
for
i
in
range
(
len
(
data_t1_c10
)):
...
...
@@ -596,8 +593,7 @@ class TDTestCase:
elif
i
==
10
:
continue
else
:
# time2str = str(int((data_t1_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000))
time2str
=
str
(
int
((
datetime
.
datetime
.
timestamp
(
data_t1_c10
[
i
])
-
datetime
.
datetime
.
timestamp
(
datetime
.
datetime
.
fromtimestamp
(
0
)))
*
1000
))
time2str
=
str
(
int
((
data_t1_c10
[
i
]
-
datetime
.
datetime
.
fromtimestamp
(
0
)).
total_seconds
()
*
1000
))
tdSql
.
checkData
(
i
,
0
,
time2str
)
tdLog
.
printNoPrefix
(
"==========step39: cast constant operation to bigint, expect change to int "
)
...
...
tests/system-test/fulltest.sh
浏览文件 @
9a930030
...
...
@@ -59,8 +59,8 @@ python3 ./test.py -f 2-query/ceil.py
python3 ./test.py
-f
2-query/ceil.py
-R
python3 ./test.py
-f
2-query/char_length.py
python3 ./test.py
-f
2-query/char_length.py
-R
python3 ./test.py
-f
2-query/check_tsdb.py
python3 ./test.py
-f
2-query/check_tsdb.py
-R
#
python3 ./test.py -f 2-query/check_tsdb.py
#
python3 ./test.py -f 2-query/check_tsdb.py -R
python3 ./test.py
-f
2-query/concat.py
python3 ./test.py
-f
2-query/concat.py
-R
python3 ./test.py
-f
2-query/concat_ws.py
...
...
@@ -93,7 +93,6 @@ python3 ./test.py -f 2-query/distribute_agg_min.py -R
python3 ./test.py
-f
1-insert/update_data.py
python3 ./test.py
-f
1-insert/delete_data.py
...
...
@@ -224,8 +223,8 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
python3 ./test.py
-f
7-tmq/tmqAutoCreateTbl.py
python3 ./test.py
-f
7-tmq/tmqDnodeRestart.py
#
python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py
#
python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
python3 ./test.py
-f
7-tmq/tmqUpdate-1ctb.py
python3 ./test.py
-f
7-tmq/tmqUpdateWithConsume.py
python3 ./test.py
-f
7-tmq/tmqUpdate-multiCtb-snapshot0.py
...
...
@@ -265,7 +264,7 @@ python3 ./test.py -f 2-query/concat.py -Q 2
python3 ./test.py
-f
2-query/concat2.py
-Q
2
python3 ./test.py
-f
2-query/concat_ws.py
-Q
2
python3 ./test.py
-f
2-query/concat_ws2.py
-Q
2
python3 ./test.py
-f
2-query/check_tsdb.py
-Q
2
#
python3 ./test.py -f 2-query/check_tsdb.py -Q 2
python3 ./test.py
-f
2-query/spread.py
-Q
2
python3 ./test.py
-f
2-query/hyperloglog.py
-Q
2
python3 ./test.py
-f
2-query/explain.py
-Q
2
...
...
@@ -354,7 +353,7 @@ python3 ./test.py -f 2-query/concat.py -Q 3
python3 ./test.py
-f
2-query/concat2.py
-Q
3
python3 ./test.py
-f
2-query/concat_ws.py
-Q
3
python3 ./test.py
-f
2-query/concat_ws2.py
-Q
3
python3 ./test.py
-f
2-query/check_tsdb.py
-Q
3
#
python3 ./test.py -f 2-query/check_tsdb.py -Q 3
python3 ./test.py
-f
2-query/spread.py
-Q
3
python3 ./test.py
-f
2-query/hyperloglog.py
-Q
3
python3 ./test.py
-f
2-query/explain.py
-Q
3
...
...
tests/test/c/tmqSim.c
浏览文件 @
9a930030
...
...
@@ -630,7 +630,7 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
{
tmq_raw_data
raw
=
{
0
};
int32_t
code
=
tmq_get_raw
_meta
(
msg
,
&
raw
);
int32_t
code
=
tmq_get_raw
(
msg
,
&
raw
);
if
(
code
==
TSDB_CODE_SUCCESS
){
int
retCode
=
queryDB
(
pInfo
->
taos
,
"use metadb"
);
...
...
@@ -641,7 +641,7 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
}
taosFprintfFile
(
g_fp
,
"raw:%p
\n
"
,
&
raw
);
t
aos_write_raw_meta
(
pInfo
->
taos
,
raw
);
t
mq_write_raw
(
pInfo
->
taos
,
raw
);
}
char
*
result
=
tmq_get_json_meta
(
msg
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录