Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
59fd4e19
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
59fd4e19
编写于
7月 27, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into fix/valgrind
上级
40e67b67
7badec9e
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
682 addition
and
47 deletion
+682
-47
include/common/tmsg.h
include/common/tmsg.h
+2
-1
include/libs/executor/dataSinkMgt.h
include/libs/executor/dataSinkMgt.h
+2
-1
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+4
-3
source/client/src/tmq.c
source/client/src/tmq.c
+31
-31
source/common/src/tmsg.c
source/common/src/tmsg.c
+2
-0
source/libs/executor/src/dataDeleter.c
source/libs/executor/src/dataDeleter.c
+2
-1
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+2
-1
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+9
-2
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+3
-4
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+3
-2
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+2
-1
tests/system-test/7-tmq/tmq_taosx.py
tests/system-test/7-tmq/tmq_taosx.py
+91
-0
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+1
-0
tests/test/c/CMakeLists.txt
tests/test/c/CMakeLists.txt
+8
-0
tests/test/c/tmq_taosx_ci.c
tests/test/c/tmq_taosx_ci.c
+520
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
59fd4e19
...
...
@@ -3044,7 +3044,8 @@ typedef struct SDeleteRes {
int64_t
skey
;
int64_t
ekey
;
int64_t
affectedRows
;
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
char
tableFName
[
TSDB_TABLE_NAME_LEN
];
char
tsColName
[
TSDB_COL_NAME_LEN
];
}
SDeleteRes
;
int32_t
tEncodeDeleteRes
(
SEncoder
*
pCoder
,
const
SDeleteRes
*
pRes
);
...
...
include/libs/executor/dataSinkMgt.h
浏览文件 @
59fd4e19
...
...
@@ -38,7 +38,8 @@ typedef struct SDeleterRes {
int64_t
skey
;
int64_t
ekey
;
int64_t
affectedRows
;
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
char
tableName
[
TSDB_TABLE_NAME_LEN
];
char
tsColName
[
TSDB_COL_NAME_LEN
];
}
SDeleterRes
;
typedef
struct
SDeleterParam
{
...
...
include/libs/nodes/plannodes.h
浏览文件 @
59fd4e19
...
...
@@ -151,7 +151,8 @@ typedef struct SVnodeModifyLogicNode {
uint64_t
tableId
;
uint64_t
stableId
;
int8_t
tableType
;
// table type
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
char
tableName
[
TSDB_TABLE_NAME_LEN
];
char
tsColName
[
TSDB_COL_NAME_LEN
];
STimeWindow
deleteTimeRange
;
SVgroupsInfo
*
pVgroupList
;
SNodeList
*
pInsertCols
;
...
...
@@ -494,7 +495,7 @@ typedef struct SQueryInserterNode {
uint64_t
tableId
;
uint64_t
stableId
;
int8_t
tableType
;
// table type
char
table
FName
[
TSDB_TABLE_F
NAME_LEN
];
char
table
Name
[
TSDB_TABLE_
NAME_LEN
];
int32_t
vgId
;
SEpSet
epSet
;
}
SQueryInserterNode
;
...
...
@@ -503,7 +504,7 @@ typedef struct SDataDeleterNode {
SDataSinkNode
sink
;
uint64_t
tableId
;
int8_t
tableType
;
// table type
char
tableFName
[
TSDB_TABLE_
F
NAME_LEN
];
char
tableFName
[
TSDB_TABLE_NAME_LEN
];
char
tsColName
[
TSDB_COL_NAME_LEN
];
STimeWindow
deleteTimeRange
;
SNode
*
pAffectedRows
;
...
...
source/client/src/tmq.c
浏览文件 @
59fd4e19
...
...
@@ -2823,35 +2823,35 @@ end:
// delete from db.tabl where .. -> delete from tabl where ..
// delete from db .tabl where .. -> delete from tabl where ..
static
void
getTbName
(
char
*
sql
){
char
*
ch
=
sql
;
bool
inBackQuote
=
false
;
int8_t
dotIndex
=
0
;
while
(
*
ch
!=
'\0'
){
if
(
!
inBackQuote
&&
*
ch
==
'`'
){
inBackQuote
=
true
;
ch
++
;
continue
;
}
if
(
inBackQuote
&&
*
ch
==
'`'
){
inBackQuote
=
false
;
ch
++
;
continue
;
}
if
(
!
inBackQuote
&&
*
ch
==
'.'
){
dotIndex
++
;
if
(
dotIndex
==
2
){
memmove
(
sql
,
ch
+
1
,
strlen
(
ch
+
1
)
+
1
);
break
;
}
}
ch
++
;
}
}
//
static void getTbName(char *sql){
//
char *ch = sql;
//
//
bool inBackQuote = false;
//
int8_t dotIndex = 0;
//
while(*ch != '\0'){
//
if(!inBackQuote && *ch == '`'){
//
inBackQuote = true;
//
ch++;
//
continue;
//
}
//
//
if(inBackQuote && *ch == '`'){
//
inBackQuote = false;
//
ch++;
//
//
continue;
//
}
//
//
if(!inBackQuote && *ch == '.'){
//
dotIndex ++;
//
if(dotIndex == 2){
//
memmove(sql, ch + 1, strlen(ch + 1) + 1);
//
break;
//
}
//
}
//
ch++;
//
}
//
}
static
int32_t
taosDeleteData
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
)
{
SDeleteRes
req
=
{
0
};
...
...
@@ -2867,9 +2867,9 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
goto
end
;
}
getTbName
(
req
.
tableFName
);
//
getTbName(req.tableFName);
char
sql
[
256
]
=
{
0
};
sprintf
(
sql
,
"delete from `%s` where `%s` >= %"
PRId64
" and `%s` <= %"
PRId64
,
req
.
tableFName
,
"ts"
,
req
.
skey
,
"ts"
,
req
.
ekey
);
sprintf
(
sql
,
"delete from `%s` where `%s` >= %"
PRId64
" and `%s` <= %"
PRId64
,
req
.
tableFName
,
req
.
tsColName
,
req
.
skey
,
req
.
tsColName
,
req
.
ekey
);
printf
(
"delete sql:%s
\n
"
,
sql
);
TAOS_RES
*
res
=
taos_query
(
taos
,
sql
);
...
...
source/common/src/tmsg.c
浏览文件 @
59fd4e19
...
...
@@ -5682,6 +5682,7 @@ int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
if
(
tEncodeI64v
(
pCoder
,
pRes
->
affectedRows
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pCoder
,
pRes
->
tableFName
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pCoder
,
pRes
->
tsColName
)
<
0
)
return
-
1
;
return
0
;
}
...
...
@@ -5700,6 +5701,7 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
if
(
tDecodeI64v
(
pCoder
,
&
pRes
->
affectedRows
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pCoder
,
pRes
->
tableFName
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pCoder
,
pRes
->
tsColName
)
<
0
)
return
-
1
;
return
0
;
}
int32_t
tEncodeSMqDataRsp
(
SEncoder
*
pEncoder
,
const
SMqDataRsp
*
pRsp
)
{
...
...
source/libs/executor/src/dataDeleter.c
浏览文件 @
59fd4e19
...
...
@@ -90,7 +90,8 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
pRes
->
uidList
=
pHandle
->
pParam
->
pUidList
;
pRes
->
skey
=
pHandle
->
pDeleter
->
deleteTimeRange
.
skey
;
pRes
->
ekey
=
pHandle
->
pDeleter
->
deleteTimeRange
.
ekey
;
strcpy
(
pRes
->
tableFName
,
pHandle
->
pDeleter
->
tableFName
);
strcpy
(
pRes
->
tableName
,
pHandle
->
pDeleter
->
tableFName
);
strcpy
(
pRes
->
tsColName
,
pHandle
->
pDeleter
->
tsColName
);
pRes
->
affectedRows
=
*
(
int64_t
*
)
pColRes
->
pData
;
pBuf
->
useSize
+=
pEntry
->
dataLen
;
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
59fd4e19
...
...
@@ -401,7 +401,8 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
COPY_SCALAR_FIELD
(
tableId
);
COPY_SCALAR_FIELD
(
stableId
);
COPY_SCALAR_FIELD
(
tableType
);
COPY_CHAR_ARRAY_FIELD
(
tableFName
);
COPY_CHAR_ARRAY_FIELD
(
tableName
);
COPY_CHAR_ARRAY_FIELD
(
tsColName
);
COPY_OBJECT_FIELD
(
deleteTimeRange
,
sizeof
(
STimeWindow
));
CLONE_OBJECT_FIELD
(
pVgroupList
,
vgroupsInfoClone
);
CLONE_NODE_LIST_FIELD
(
pInsertCols
);
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
59fd4e19
...
...
@@ -2332,7 +2332,7 @@ static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) {
code
=
tjsonAddIntegerToObject
(
pJson
,
jkQueryInsertPhysiPlanTableType
,
pNode
->
tableType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddStringToObject
(
pJson
,
jkQueryInsertPhysiPlanTableFName
,
pNode
->
table
F
Name
);
code
=
tjsonAddStringToObject
(
pJson
,
jkQueryInsertPhysiPlanTableFName
,
pNode
->
tableName
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkQueryInsertPhysiPlanVgId
,
pNode
->
vgId
);
...
...
@@ -2361,7 +2361,7 @@ static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) {
code
=
tjsonGetTinyIntValue
(
pJson
,
jkQueryInsertPhysiPlanTableType
,
&
pNode
->
tableType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetStringValue
(
pJson
,
jkQueryInsertPhysiPlanTableFName
,
pNode
->
table
F
Name
);
code
=
tjsonGetStringValue
(
pJson
,
jkQueryInsertPhysiPlanTableFName
,
pNode
->
tableName
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkQueryInsertPhysiPlanVgId
,
&
pNode
->
vgId
);
...
...
@@ -2376,6 +2376,7 @@ static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) {
static
const
char
*
jkDeletePhysiPlanTableId
=
"TableId"
;
static
const
char
*
jkDeletePhysiPlanTableType
=
"TableType"
;
static
const
char
*
jkDeletePhysiPlanTableFName
=
"TableFName"
;
static
const
char
*
jkDeletePhysiPlanTsColName
=
"TsColName"
;
static
const
char
*
jkDeletePhysiPlanDeleteTimeRangeStartKey
=
"DeleteTimeRangeStartKey"
;
static
const
char
*
jkDeletePhysiPlanDeleteTimeRangeEndKey
=
"DeleteTimeRangeEndKey"
;
static
const
char
*
jkDeletePhysiPlanAffectedRows
=
"AffectedRows"
;
...
...
@@ -2393,6 +2394,9 @@ static int32_t physiDeleteNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddStringToObject
(
pJson
,
jkDeletePhysiPlanTableFName
,
pNode
->
tableFName
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddStringToObject
(
pJson
,
jkDeletePhysiPlanTsColName
,
pNode
->
tsColName
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkDeletePhysiPlanDeleteTimeRangeStartKey
,
pNode
->
deleteTimeRange
.
skey
);
}
...
...
@@ -2419,6 +2423,9 @@ static int32_t jsonToPhysiDeleteNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetStringValue
(
pJson
,
jkDeletePhysiPlanTableFName
,
pNode
->
tableFName
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetStringValue
(
pJson
,
jkDeletePhysiPlanTsColName
,
pNode
->
tsColName
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkDeletePhysiPlanDeleteTimeRangeStartKey
,
&
pNode
->
deleteTimeRange
.
skey
);
}
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
59fd4e19
...
...
@@ -1292,8 +1292,8 @@ static int32_t createVnodeModifLogicNodeByDelete(SLogicPlanContext* pCxt, SDelet
pModify
->
modifyType
=
MODIFY_TABLE_TYPE_DELETE
;
pModify
->
tableId
=
pRealTable
->
pMeta
->
uid
;
pModify
->
tableType
=
pRealTable
->
pMeta
->
tableType
;
snprintf
(
pModify
->
table
FName
,
sizeof
(
pModify
->
tableFName
),
"%d.%s.%s"
,
pCxt
->
pPlanCxt
->
acctId
,
pRealTable
->
table
.
dbName
,
pRealTable
->
table
.
tableN
ame
);
snprintf
(
pModify
->
table
Name
,
sizeof
(
pModify
->
tableName
),
"%s"
,
pRealTable
->
table
.
tableName
);
strcpy
(
pModify
->
tsColName
,
pRealTable
->
pMeta
->
schema
->
n
ame
);
pModify
->
deleteTimeRange
=
pDelete
->
timeRange
;
pModify
->
pAffectedRows
=
nodesCloneNode
(
pDelete
->
pCountFunc
);
if
(
NULL
==
pModify
->
pAffectedRows
)
{
...
...
@@ -1342,8 +1342,7 @@ static int32_t createVnodeModifLogicNodeByInsert(SLogicPlanContext* pCxt, SInser
pModify
->
tableId
=
pRealTable
->
pMeta
->
uid
;
pModify
->
stableId
=
pRealTable
->
pMeta
->
suid
;
pModify
->
tableType
=
pRealTable
->
pMeta
->
tableType
;
snprintf
(
pModify
->
tableFName
,
sizeof
(
pModify
->
tableFName
),
"%d.%s.%s"
,
pCxt
->
pPlanCxt
->
acctId
,
pRealTable
->
table
.
dbName
,
pRealTable
->
table
.
tableName
);
snprintf
(
pModify
->
tableName
,
sizeof
(
pModify
->
tableName
),
"%s"
,
pRealTable
->
table
.
tableName
);
TSWAP
(
pModify
->
pVgroupList
,
pRealTable
->
pVgroupList
);
pModify
->
pInsertCols
=
nodesCloneList
(
pInsert
->
pCols
);
if
(
NULL
==
pModify
->
pInsertCols
)
{
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
59fd4e19
...
...
@@ -1588,7 +1588,7 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod
pInserter
->
tableId
=
pModify
->
tableId
;
pInserter
->
stableId
=
pModify
->
stableId
;
pInserter
->
tableType
=
pModify
->
tableType
;
strcpy
(
pInserter
->
table
FName
,
pModify
->
tableF
Name
);
strcpy
(
pInserter
->
table
Name
,
pModify
->
table
Name
);
pInserter
->
vgId
=
pModify
->
pVgroupList
->
vgroups
[
0
].
vgId
;
pInserter
->
epSet
=
pModify
->
pVgroupList
->
vgroups
[
0
].
epSet
;
vgroupInfoToNodeAddr
(
pModify
->
pVgroupList
->
vgroups
,
&
pSubplan
->
execNode
);
...
...
@@ -1638,7 +1638,8 @@ static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode*
pDeleter
->
tableId
=
pModify
->
tableId
;
pDeleter
->
tableType
=
pModify
->
tableType
;
strcpy
(
pDeleter
->
tableFName
,
pModify
->
tableFName
);
strcpy
(
pDeleter
->
tableFName
,
pModify
->
tableName
);
strcpy
(
pDeleter
->
tsColName
,
pModify
->
tsColName
);
pDeleter
->
deleteTimeRange
=
pModify
->
deleteTimeRange
;
int32_t
code
=
setNodeSlotId
(
pCxt
,
pRoot
->
pOutputDataBlockDesc
->
dataBlockId
,
-
1
,
pModify
->
pAffectedRows
,
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
59fd4e19
...
...
@@ -283,7 +283,8 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
pRes
->
skey
=
pDelRes
->
skey
;
pRes
->
ekey
=
pDelRes
->
ekey
;
pRes
->
affectedRows
=
pDelRes
->
affectedRows
;
strcpy
(
pRes
->
tableFName
,
pDelRes
->
tableFName
);
strcpy
(
pRes
->
tableFName
,
pDelRes
->
tableName
);
strcpy
(
pRes
->
tsColName
,
pDelRes
->
tsColName
);
taosMemoryFree
(
output
.
pData
);
return
TSDB_CODE_SUCCESS
;
...
...
tests/system-test/7-tmq/tmq_taosx.py
0 → 100644
浏览文件 @
59fd4e19
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def
checkFileContent
(
self
):
buildPath
=
tdCom
.
getBuildPath
()
cfgPath
=
tdCom
.
getClientCfgPath
()
cmdStr
=
'%s/build/bin/tmq_taosx_ci -c %s'
%
(
buildPath
,
cfgPath
)
tdLog
.
info
(
cmdStr
)
os
.
system
(
cmdStr
)
srcFile
=
'%s/../log/tmq_taosx_tmp.source'
%
(
cfgPath
)
dstFile
=
'%s/../log/tmq_taosx_tmp.result'
%
(
cfgPath
)
tdLog
.
info
(
"compare file: %s, %s"
%
(
srcFile
,
dstFile
))
consumeFile
=
open
(
srcFile
,
mode
=
'r'
)
queryFile
=
open
(
dstFile
,
mode
=
'r'
)
while
True
:
dst
=
queryFile
.
readline
()
src
=
consumeFile
.
readline
()
if
dst
:
if
dst
!=
src
:
tdLog
.
exit
(
"compare error: %s != %s"
%
src
,
dst
)
else
:
break
tdSql
.
execute
(
'use db_taosx'
)
tdSql
.
query
(
"select * from ct3"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
51
)
tdSql
.
checkData
(
0
,
4
,
940
)
tdSql
.
checkData
(
1
,
1
,
23
)
tdSql
.
checkData
(
1
,
4
,
None
)
tdSql
.
query
(
"select * from ct1"
)
tdSql
.
checkRows
(
4
)
tdSql
.
query
(
"select * from ct2"
)
tdSql
.
checkRows
(
0
)
tdSql
.
query
(
"select * from ct0"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
3
,
"a"
)
tdSql
.
checkData
(
1
,
4
,
None
)
tdSql
.
query
(
"select * from n1"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
"eeee"
)
tdSql
.
checkData
(
1
,
2
,
940
)
tdSql
.
query
(
"select * from jt"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
11
)
tdSql
.
checkData
(
0
,
2
,
None
)
tdSql
.
checkData
(
1
,
1
,
1
)
tdSql
.
checkData
(
1
,
2
,
'{"k1":1,"k2":"hello"}'
)
return
def
run
(
self
):
tdSql
.
prepare
()
self
.
checkFileContent
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/fulltest.sh
浏览文件 @
59fd4e19
...
...
@@ -227,6 +227,7 @@ python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py
python3 ./test.py
-f
7-tmq/stbTagFilter-1ctb.py
python3 ./test.py
-f
7-tmq/dataFromTsdbNWal.py
python3 ./test.py
-f
7-tmq/dataFromTsdbNWal-multiCtb.py
python3 ./test.py
-f
7-tmq/tmq_taosx.py
# python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
#------------querPolicy 2-----------
...
...
tests/test/c/CMakeLists.txt
浏览文件 @
59fd4e19
add_executable
(
tmq_demo tmqDemo.c
)
add_executable
(
tmq_sim tmqSim.c
)
add_executable
(
create_table createTable.c
)
add_executable
(
tmq_taosx_ci tmq_taosx_ci.c
)
target_link_libraries
(
create_table
PUBLIC taos_static
...
...
@@ -22,6 +23,13 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
target_link_libraries
(
tmq_taosx_ci
PUBLIC taos_static
PUBLIC util
PUBLIC common
PUBLIC os
)
add_executable
(
sdbDump sdbDump.c
)
target_link_libraries
(
...
...
tests/test/c/tmq_taosx_ci.c
0 → 100644
浏览文件 @
59fd4e19
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
#include "types.h"
static
int
running
=
1
;
TdFilePtr
g_fp
=
NULL
;
char
dir
[
64
]
=
{
0
};
static
TAOS
*
use_db
(){
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
NULL
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use db_taosx"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
NULL
;
}
taos_free_result
(
pRes
);
return
pConn
;
}
static
void
msg_process
(
TAOS_RES
*
msg
)
{
/*memset(buf, 0, 1024);*/
printf
(
"-----------topic-------------: %s
\n
"
,
tmq_get_topic_name
(
msg
));
printf
(
"db: %s
\n
"
,
tmq_get_db_name
(
msg
));
printf
(
"vg: %d
\n
"
,
tmq_get_vgroup_id
(
msg
));
TAOS
*
pConn
=
use_db
();
if
(
tmq_get_res_type
(
msg
)
==
TMQ_RES_TABLE_META
)
{
char
*
result
=
tmq_get_json_meta
(
msg
);
if
(
result
)
{
printf
(
"meta result: %s
\n
"
,
result
);
}
taosFprintfFile
(
g_fp
,
result
);
taosFprintfFile
(
g_fp
,
"
\n
"
);
tmq_free_json_meta
(
result
);
}
tmq_raw_data
raw
=
{
0
};
tmq_get_raw
(
msg
,
&
raw
);
int32_t
ret
=
tmq_write_raw
(
pConn
,
raw
);
printf
(
"write raw data: %s
\n
"
,
tmq_err2str
(
ret
));
// else{
// while(1){
// int numOfRows = 0;
// void *pData = NULL;
// taos_fetch_raw_block(msg, &numOfRows, &pData);
// if(numOfRows == 0) break;
// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows);
// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg));
// printf("write raw data: %s\n", tmq_err2str(ret));
// }
// }
taos_close
(
pConn
);
}
int32_t
init_env
()
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"drop database if exists db_taosx"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists db_taosx vgroups 1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop database if exists abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups 1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct0 using st1 tags(1000,
\"
ttt
\"
, true)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table tu1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct0 values(1626006833600, 1, 2, 'a')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct1 using st1(t1) tags(2000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct2 using st1(t1) tags(NULL)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct1 values(1626006833600, 3, 4, 'b')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct3 using st1(t1) tags(3000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 add column c4 bigint"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 modify column c3 binary(64)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (1626006833609, 51, 62, 'c333', 940)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 select * from ct1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 add tag t2 binary(64)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table ct3 set tag t1=5000"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to slter child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"delete from abc1 .ct3 where ts < 1626006833606"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 add column c3 bigint"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 modify column c2 nchar(8)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 rename column c3 cc3"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 comment 'hello'"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 drop column c1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt(ts timestamp, i int) tags(t json)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt1 using jt tags('{
\"
k1
\"
:1,
\"
k2
\"
:
\"
hello
\"
}')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt2 using jt tags('')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into jt1 values(now, 1)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into jt2 values(now, 11)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
0
;
}
int32_t
create_topic
()
{
printf
(
"create topic
\n
"
);
TAOS_RES
*
pRes
;
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column with meta as database abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
0
;
}
void
tmq_commit_cb_print
(
tmq_t
*
tmq
,
int32_t
code
,
void
*
param
)
{
printf
(
"commit %d tmq %p param %p
\n
"
,
code
,
tmq
,
param
);
}
tmq_t
*
build_consumer
()
{
#if 0
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_conf_set
(
conf
,
"client.id"
,
"my app 1"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.heartbeat.background"
,
"true"
);
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
assert
(
tmq
);
tmq_conf_destroy
(
conf
);
return
tmq
;
}
tmq_list_t
*
build_topic_list
()
{
tmq_list_t
*
topic_list
=
tmq_list_new
();
tmq_list_append
(
topic_list
,
"topic_ctb_column"
);
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
return
topic_list
;
}
void
basic_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
code
));
printf
(
"subscribe err
\n
"
);
return
;
}
int32_t
cnt
=
0
;
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
cnt
++
;
msg_process
(
tmqmessage
);
/*if (cnt >= 2) break;*/
/*printf("get data\n");*/
taos_free_result
(
tmqmessage
);
/*} else {*/
/*break;*/
/*tmq_commit_sync(tmq, NULL);*/
}
else
{
break
;
}
}
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
code
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
sync_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
static
const
int
MIN_COMMIT_COUNT
=
1
;
int
msg_count
=
0
;
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
code
));
return
;
}
tmq_list_t
*
subList
=
NULL
;
tmq_subscription
(
tmq
,
&
subList
);
char
**
subTopics
=
tmq_list_to_c_array
(
subList
);
int32_t
sz
=
tmq_list_get_size
(
subList
);
printf
(
"subscribed topics: "
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
printf
(
"%s, "
,
subTopics
[
i
]);
}
printf
(
"
\n
"
);
tmq_list_destroy
(
subList
);
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
msg_process
(
tmqmessage
);
taos_free_result
(
tmqmessage
);
/*tmq_commit_sync(tmq, NULL);*/
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
code
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
initLogFile
()
{
char
f1
[
256
]
=
{
0
};
char
f2
[
256
]
=
{
0
};
sprintf
(
f1
,
"%s/../log/tmq_taosx_tmp.source"
,
dir
);
sprintf
(
f2
,
"%s/../log/tmq_taosx_tmp.result"
,
dir
);
TdFilePtr
pFile
=
taosOpenFile
(
f1
,
TD_FILE_TEXT
|
TD_FILE_TRUNC
|
TD_FILE_STREAM
);
if
(
NULL
==
pFile
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
f1
);
exit
(
-
1
);
}
g_fp
=
pFile
;
TdFilePtr
pFile2
=
taosOpenFile
(
f2
,
TD_FILE_TEXT
|
TD_FILE_TRUNC
|
TD_FILE_STREAM
);
if
(
NULL
==
pFile2
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
f2
);
exit
(
-
1
);
}
char
*
result
[]
=
{
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
c1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
c2
\"
,
\"
type
\"
:6},{
\"
name
\"
:
\"
c3
\"
,
\"
type
\"
:8,
\"
length
\"
:16}],
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
t3
\"
,
\"
type
\"
:10,
\"
length
\"
:8},{
\"
name
\"
:
\"
t4
\"
,
\"
type
\"
:1}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct0
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:3,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:1000},{
\"
name
\"
:
\"
t3
\"
,
\"
type
\"
:10,
\"
value
\"
:
\"\\\"
ttt
\\\"\"
},{
\"
name
\"
:
\"
t4
\"
,
\"
type
\"
:1,
\"
value
\"
:1}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct1
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:3,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:2000}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct2
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:3,
\"
tags
\"
:[]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct3
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:3,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:3000}]}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
alterType
\"
:5,
\"
colName
\"
:
\"
c4
\"
,
\"
colType
\"
:5}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
alterType
\"
:7,
\"
colName
\"
:
\"
c3
\"
,
\"
colType
\"
:8,
\"
colLength
\"
:64}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
alterType
\"
:1,
\"
colName
\"
:
\"
t2
\"
,
\"
colType
\"
:8,
\"
colLength
\"
:64}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
ct3
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
alterType
\"
:4,
\"
colName
\"
:
\"
t1
\"
,
\"
colValue
\"
:
\"
5000
\"
,
\"
colValueNull
\"
:false}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
c1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
c2
\"
,
\"
type
\"
:10,
\"
length
\"
:4}],
\"
tags
\"
:[]}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:5,
\"
colName
\"
:
\"
c3
\"
,
\"
colType
\"
:5}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:7,
\"
colName
\"
:
\"
c2
\"
,
\"
colType
\"
:10,
\"
colLength
\"
:8}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:10,
\"
colName
\"
:
\"
c3
\"
,
\"
colNewName
\"
:
\"
cc3
\"
}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:9}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:6,
\"
colName
\"
:
\"
c1
\"
}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
i
\"
,
\"
type
\"
:4}],
\"
tags
\"
:[{
\"
name
\"
:
\"
t
\"
,
\"
type
\"
:15}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt1
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
jt
\"
,
\"
tagNum
\"
:1,
\"
tags
\"
:[{
\"
name
\"
:
\"
t
\"
,
\"
type
\"
:15,
\"
value
\"
:
\"
{
\\\"
k1
\\\"
:1,
\\\"
k2
\\\"
:
\\\"
hello
\\\"
}
\"
}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt2
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
jt
\"
,
\"
tagNum
\"
:1,
\"
tags
\"
:[]}"
};
for
(
int
i
=
0
;
i
<
sizeof
(
result
)
/
sizeof
(
result
[
0
]);
i
++
){
taosFprintfFile
(
pFile2
,
result
[
i
]);
taosFprintfFile
(
pFile2
,
"
\n
"
);
}
taosCloseFile
(
&
pFile2
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
argc
==
3
&&
strcmp
(
argv
[
1
],
"-c"
)
==
0
)
{
strcpy
(
dir
,
argv
[
2
]);
}
else
{
strcpy
(
dir
,
"../../../sim/psim/cfg"
);
}
printf
(
"env init
\n
"
);
initLogFile
();
if
(
init_env
()
<
0
)
{
return
-
1
;
}
create_topic
();
tmq_t
*
tmq
=
build_consumer
();
tmq_list_t
*
topic_list
=
build_topic_list
();
basic_consume_loop
(
tmq
,
topic_list
);
/*sync_consume_loop(tmq, topic_list);*/
taosCloseFile
(
&
g_fp
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录