Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b1663924
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
b1663924
编写于
10月 17, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
10月 17, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17393 from taosdata/feature/TD-14761
fix:scan converity defects
上级
7467a54a
54e3617a
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
85 addition
and
51 deletion
+85
-51
include/common/tmsg.h
include/common/tmsg.h
+1
-1
source/client/src/clientHb.c
source/client/src/clientHb.c
+1
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+2
-2
source/client/src/clientRawBlockWrite.c
source/client/src/clientRawBlockWrite.c
+13
-7
source/client/src/clientSml.c
source/client/src/clientSml.c
+25
-11
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+3
-0
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+1
-2
source/common/src/tdataformat.c
source/common/src/tdataformat.c
+4
-0
tools/taosws-rs
tools/taosws-rs
+1
-0
utils/test/c/sml_test.c
utils/test/c/sml_test.c
+4
-4
utils/test/c/tmqSim.c
utils/test/c/tmqSim.c
+29
-22
utils/test/c/tmq_taosx_ci.c
utils/test/c/tmq_taosx_ci.c
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
b1663924
...
...
@@ -1902,7 +1902,7 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
if
(
pRebInfo
==
NULL
)
{
return
NULL
;
}
strcpy
(
pRebInfo
->
key
,
key
);
tstrncpy
(
pRebInfo
->
key
,
key
,
sizeof
(
pRebInfo
->
key
)
);
pRebInfo
->
lostConsumers
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
if
(
pRebInfo
->
lostConsumers
==
NULL
)
{
goto
_err
;
...
...
source/client/src/clientHb.c
浏览文件 @
b1663924
...
...
@@ -173,7 +173,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
pTscObj
->
pAppInfo
->
totalDnodes
=
pRsp
->
query
->
totalDnodes
;
pTscObj
->
pAppInfo
->
onlineDnodes
=
pRsp
->
query
->
onlineDnodes
;
pTscObj
->
connId
=
pRsp
->
query
->
connId
;
tscTrace
(
"conn %
p
hb rsp, dnodes %d/%d"
,
pTscObj
->
connId
,
pTscObj
->
pAppInfo
->
onlineDnodes
,
tscTrace
(
"conn %
u
hb rsp, dnodes %d/%d"
,
pTscObj
->
connId
,
pTscObj
->
pAppInfo
->
onlineDnodes
,
pTscObj
->
pAppInfo
->
totalDnodes
);
if
(
pRsp
->
query
->
killRid
)
{
...
...
source/client/src/clientImpl.c
浏览文件 @
b1663924
...
...
@@ -186,7 +186,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
STscObj
*
pTscObj
=
(
*
pRequest
)
->
pTscObj
;
if
(
taosHashPut
(
pTscObj
->
pRequests
,
&
(
*
pRequest
)
->
self
,
sizeof
((
*
pRequest
)
->
self
),
&
(
*
pRequest
)
->
self
,
sizeof
((
*
pRequest
)
->
self
)))
{
tscError
(
"%
d failed to add to request container, reqId:0x%"
PRIx64
", conn:%d
, %s"
,
(
*
pRequest
)
->
self
,
tscError
(
"%
"
PRIx64
" failed to add to request container, reqId:0x%"
PRIu64
", conn:%"
PRIx64
"
, %s"
,
(
*
pRequest
)
->
self
,
(
*
pRequest
)
->
requestId
,
pTscObj
->
id
,
sql
);
taosMemoryFree
(
param
);
...
...
@@ -371,7 +371,7 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
pInfo
->
pQnodeList
=
taosArrayDup
(
pNodeList
);
taosArraySort
(
pInfo
->
pQnodeList
,
compareQueryNodeLoad
);
tscDebug
(
"QnodeList updated in cluster 0x%"
PRIx64
", num:%d"
,
pInfo
->
clusterId
,
taosArrayGetSize
(
pInfo
->
pQnodeList
));
(
int
)
taosArrayGetSize
(
pInfo
->
pQnodeList
));
}
taosThreadMutexUnlock
(
&
pInfo
->
qnodeMutex
);
...
...
source/client/src/clientRawBlockWrite.c
浏览文件 @
b1663924
...
...
@@ -410,6 +410,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
SDecoder
decoder
=
{
0
};
SVAlterTbReq
vAlterTbReq
=
{
0
};
char
*
string
=
NULL
;
cJSON
*
json
=
NULL
;
// decode
void
*
data
=
POINTER_SHIFT
(
metaRsp
->
metaRsp
,
sizeof
(
SMsgHead
));
...
...
@@ -419,7 +420,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
goto
_exit
;
}
cJSON
*
json
=
cJSON_CreateObject
();
json
=
cJSON_CreateObject
();
if
(
json
==
NULL
)
{
goto
_exit
;
}
...
...
@@ -524,6 +525,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
SDecoder
decoder
=
{
0
};
SVDropStbReq
req
=
{
0
};
char
*
string
=
NULL
;
cJSON
*
json
=
NULL
;
// decode
void
*
data
=
POINTER_SHIFT
(
metaRsp
->
metaRsp
,
sizeof
(
SMsgHead
));
...
...
@@ -533,7 +535,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
goto
_exit
;
}
cJSON
*
json
=
cJSON_CreateObject
();
json
=
cJSON_CreateObject
();
if
(
json
==
NULL
)
{
goto
_exit
;
}
...
...
@@ -556,6 +558,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
SDecoder
decoder
=
{
0
};
SVDropTbBatchReq
req
=
{
0
};
char
*
string
=
NULL
;
cJSON
*
json
=
NULL
;
// decode
void
*
data
=
POINTER_SHIFT
(
metaRsp
->
metaRsp
,
sizeof
(
SMsgHead
));
...
...
@@ -565,7 +568,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
goto
_exit
;
}
cJSON
*
json
=
cJSON_CreateObject
();
json
=
cJSON_CreateObject
();
if
(
json
==
NULL
)
{
goto
_exit
;
}
...
...
@@ -684,7 +687,7 @@ end:
static
int32_t
taosDropStb
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
)
{
SVDropStbReq
req
=
{
0
};
SDecoder
coder
;
SDecoder
coder
=
{
0
}
;
SMDropStbReq
pReq
=
{
0
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
SRequestObj
*
pRequest
=
NULL
;
...
...
@@ -1212,6 +1215,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
int32_t
code
=
TSDB_CODE_SUCCESS
;
STableMeta
*
pTableMeta
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SSubmitReq
*
subReq
=
NULL
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
createRequest
(
*
(
int64_t
*
)
taos
,
TSDB_SQL_INSERT
);
if
(
!
pRequest
)
{
...
...
@@ -1228,8 +1232,8 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
}
SName
pName
=
{
TSDB_TABLE_NAME_T
,
pRequest
->
pTscObj
->
acctId
,
{
0
},
{
0
}};
strcpy
(
pName
.
dbname
,
pRequest
->
pDb
);
strcpy
(
pName
.
tname
,
tbname
);
tstrncpy
(
pName
.
dbname
,
pRequest
->
pDb
,
sizeof
(
pName
.
dbname
)
);
tstrncpy
(
pName
.
tname
,
tbname
,
sizeof
(
pName
.
tname
)
);
struct
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
...
...
@@ -1278,7 +1282,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
int32_t
submitLen
=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
rows
*
extendedRowSize
;
int32_t
totalLen
=
sizeof
(
SSubmitReq
)
+
submitLen
;
SSubmitReq
*
subReq
=
taosMemoryCalloc
(
1
,
totalLen
);
subReq
=
taosMemoryCalloc
(
1
,
totalLen
);
SSubmitBlk
*
blk
=
POINTER_SHIFT
(
subReq
,
sizeof
(
SSubmitReq
));
void
*
blkSchema
=
POINTER_SHIFT
(
blk
,
sizeof
(
SSubmitBlk
));
STSRow
*
rowData
=
POINTER_SHIFT
(
blkSchema
,
schemaLen
);
...
...
@@ -1352,6 +1356,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
if
(
NULL
==
pQuery
)
{
uError
(
"create SQuery error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
subReq
);
goto
end
;
}
pQuery
->
execMode
=
QUERY_EXEC_MODE_SCHEDULE
;
...
...
@@ -1390,6 +1395,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
end:
taosMemoryFreeClear
(
pTableMeta
);
qDestroyQuery
(
pQuery
);
taosMemoryFree
(
subReq
);
return
code
;
}
...
...
source/client/src/clientSml.c
浏览文件 @
b1663924
...
...
@@ -299,6 +299,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool
for
(;
i
<
taosArrayGetSize
(
cols
);
i
++
)
{
SSmlKv
*
kv
=
(
SSmlKv
*
)
taosArrayGetP
(
cols
,
i
);
if
(
taosHashGet
(
hashTmp
,
kv
->
key
,
kv
->
keyLen
)
==
NULL
)
{
taosHashCleanup
(
hashTmp
);
return
-
1
;
}
}
...
...
@@ -430,7 +431,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
STableMeta
*
pTableMeta
=
NULL
;
SName
pName
=
{
TSDB_TABLE_NAME_T
,
info
->
taos
->
acctId
,
{
0
},
{
0
}};
strcpy
(
pName
.
dbname
,
info
->
pRequest
->
pDb
);
tstrncpy
(
pName
.
dbname
,
info
->
pRequest
->
pDb
,
sizeof
(
pName
.
dbname
)
);
SRequestConnInfo
conn
=
{
0
};
conn
.
pTrans
=
info
->
taos
->
pAppInfo
->
pTransporter
;
...
...
@@ -874,7 +875,8 @@ static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArra
kv
->
i
=
ts
;
kv
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
kv
->
length
=
(
int16_t
)
tDataTypes
[
kv
->
type
].
bytes
;
if
(
cols
)
taosArrayPush
(
cols
,
&
kv
);
taosArrayPush
(
cols
,
&
kv
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1009,6 +1011,7 @@ static void smlParseTelnetElement(const char **sql, const char **data, int32_t *
static
int32_t
smlParseTelnetTags
(
const
char
*
data
,
SArray
*
cols
,
char
*
childTableName
,
SHashObj
*
dumplicateKey
,
SSmlMsgBuf
*
msg
)
{
if
(
!
cols
)
return
TSDB_CODE_OUT_OF_MEMORY
;
const
char
*
sql
=
data
;
size_t
childTableNameLen
=
strlen
(
tsSmlChildTableName
);
while
(
*
sql
!=
'\0'
)
{
...
...
@@ -1082,7 +1085,7 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab
kv
->
length
=
valueLen
;
kv
->
type
=
TSDB_DATA_TYPE_NCHAR
;
if
(
cols
)
taosArrayPush
(
cols
,
&
kv
);
taosArrayPush
(
cols
,
&
kv
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1370,8 +1373,14 @@ static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
SHashObj
*
s2
=
*
(
SHashObj
**
)
key2
;
SSmlKv
*
kv1
=
*
(
SSmlKv
**
)
taosHashGet
(
s1
,
TS
,
TS_LEN
);
SSmlKv
*
kv2
=
*
(
SSmlKv
**
)
taosHashGet
(
s2
,
TS
,
TS_LEN
);
ASSERT
(
kv1
->
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
ASSERT
(
kv2
->
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
if
(
!
kv1
||
kv1
->
type
!=
TSDB_DATA_TYPE_TIMESTAMP
){
uError
(
"smlKvTimeHashCompare kv1"
);
return
-
1
;
}
if
(
!
kv2
||
kv2
->
type
!=
TSDB_DATA_TYPE_TIMESTAMP
){
uError
(
"smlKvTimeHashCompare kv2"
);
return
-
1
;
}
if
(
kv1
->
i
<
kv2
->
i
)
{
return
-
1
;
}
else
if
(
kv1
->
i
>
kv2
->
i
)
{
...
...
@@ -1735,7 +1744,7 @@ static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
kv
->
i
=
tsVal
;
kv
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
kv
->
length
=
(
int16_t
)
tDataTypes
[
kv
->
type
].
bytes
;
if
(
cols
)
taosArrayPush
(
cols
,
&
kv
);
taosArrayPush
(
cols
,
&
kv
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1932,6 +1941,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
}
static
int32_t
smlParseColsFromJSON
(
cJSON
*
root
,
SArray
*
cols
)
{
if
(
!
cols
)
return
TSDB_CODE_OUT_OF_MEMORY
;
cJSON
*
metricVal
=
cJSON_GetObjectItem
(
root
,
"value"
);
if
(
metricVal
==
NULL
)
{
return
TSDB_CODE_TSC_INVALID_JSON
;
...
...
@@ -1941,7 +1951,7 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
if
(
!
kv
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
if
(
cols
)
taosArrayPush
(
cols
,
&
kv
);
taosArrayPush
(
cols
,
&
kv
);
kv
->
key
=
VALUE
;
kv
->
keyLen
=
VALUE_LEN
;
...
...
@@ -1955,7 +1965,9 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
static
int32_t
smlParseTagsFromJSON
(
cJSON
*
root
,
SArray
*
pKVs
,
char
*
childTableName
,
SHashObj
*
dumplicateKey
,
SSmlMsgBuf
*
msg
)
{
int32_t
ret
=
TSDB_CODE_SUCCESS
;
if
(
!
pKVs
){
return
TSDB_CODE_OUT_OF_MEMORY
;
}
cJSON
*
tags
=
cJSON_GetObjectItem
(
root
,
"tags"
);
if
(
tags
==
NULL
||
tags
->
type
!=
cJSON_Object
)
{
return
TSDB_CODE_TSC_INVALID_JSON
;
...
...
@@ -1985,14 +1997,14 @@ static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableN
return
TSDB_CODE_TSC_INVALID_JSON
;
}
memset
(
childTableName
,
0
,
TSDB_TABLE_NAME_LEN
);
strncpy
(
childTableName
,
tag
->
valuestring
,
TSDB_TABLE_NAME_LEN
);
t
strncpy
(
childTableName
,
tag
->
valuestring
,
TSDB_TABLE_NAME_LEN
);
continue
;
}
// add kv to SSmlKv
SSmlKv
*
kv
=
(
SSmlKv
*
)
taosMemoryCalloc
(
sizeof
(
SSmlKv
),
1
);
if
(
!
kv
)
return
TSDB_CODE_OUT_OF_MEMORY
;
if
(
pKVs
)
taosArrayPush
(
pKVs
,
&
kv
);
taosArrayPush
(
pKVs
,
&
kv
);
// key
kv
->
keyLen
=
keyLen
;
...
...
@@ -2103,6 +2115,8 @@ static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) {
if
(
!
oneTable
)
{
tinfo
=
smlBuildTableInfo
();
if
(
!
tinfo
)
{
smlDestroyCols
(
cols
);
if
(
info
->
dataFormat
)
taosArrayDestroy
(
cols
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
taosHashPut
(
info
->
childTables
,
elements
.
measure
,
elements
.
measureTagsLen
,
&
tinfo
,
POINTER_BYTES
);
...
...
@@ -2295,7 +2309,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
SSmlTableInfo
*
tableData
=
*
oneTable
;
SName
pName
=
{
TSDB_TABLE_NAME_T
,
info
->
taos
->
acctId
,
{
0
},
{
0
}};
strcpy
(
pName
.
dbname
,
info
->
pRequest
->
pDb
);
tstrncpy
(
pName
.
dbname
,
info
->
pRequest
->
pDb
,
sizeof
(
pName
.
dbname
)
);
memcpy
(
pName
.
tname
,
tableData
->
childTableName
,
strlen
(
tableData
->
childTableName
));
SRequestConnInfo
conn
=
{
0
};
...
...
source/client/src/clientStmt.c
浏览文件 @
b1663924
...
...
@@ -201,6 +201,9 @@ int32_t stmtCacheBlock(STscStmt* pStmt) {
}
STableDataBlocks
**
pSrc
=
taosHashGet
(
pStmt
->
exec
.
pBlockHash
,
pStmt
->
bInfo
.
tbFName
,
strlen
(
pStmt
->
bInfo
.
tbFName
));
if
(
!
pSrc
){
return
TSDB_CODE_OUT_OF_MEMORY
;
}
STableDataBlocks
*
pDst
=
NULL
;
STMT_ERR_RET
(
qCloneStmtDataBlock
(
&
pDst
,
*
pSrc
));
...
...
source/client/src/clientTmq.c
浏览文件 @
b1663924
...
...
@@ -871,8 +871,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t
*
pTmq
=
taosMemoryCalloc
(
1
,
sizeof
(
tmq_t
));
if
(
pTmq
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tscError
(
"consumer %"
PRId64
" setup failed since %s, consumer group %s"
,
pTmq
->
consumerId
,
terrstr
(),
pTmq
->
groupId
);
tscError
(
"consumer setup failed since %s"
,
terrstr
());
return
NULL
;
}
...
...
source/common/src/tdataformat.c
浏览文件 @
b1663924
...
...
@@ -916,6 +916,10 @@ char *tTagValToData(const STagVal *value, bool isJson) {
}
bool
tTagGet
(
const
STag
*
pTag
,
STagVal
*
pTagVal
)
{
if
(
!
pTag
||
!
pTagVal
){
return
false
;
}
int16_t
lidx
=
0
;
int16_t
ridx
=
pTag
->
nTag
-
1
;
int16_t
midx
;
...
...
taosws-rs
@
7a94ffab
Subproject commit 7a94ffab45f08e16f09b3f430fe75d717054adb6
utils/test/c/sml_test.c
浏览文件 @
b1663924
...
...
@@ -119,7 +119,7 @@ int smlProcess_json1_Test() {
"
\"
dc
\"
:
\"
lga
\"
"
" }"
" }"
"]"
};
"]"
,
};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
...
@@ -159,7 +159,7 @@ int smlProcess_json2_Test() {
" },"
"
\"
id
\"
:
\"
d1001
\"
"
" }"
"}"
};
"}"
,
};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
...
@@ -227,7 +227,7 @@ int smlProcess_json3_Test() {
" },"
"
\"
id
\"
:
\"
d1001
\"
"
" }"
"}"
};
"}"
,
};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
...
@@ -286,7 +286,7 @@ int smlProcess_json4_Test() {
"
\"
t9
\"
: false,"
"
\"
id
\"
:
\"
d1001
\"
"
" }"
"}"
};
"}"
,
};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
...
utils/test/c/tmqSim.c
浏览文件 @
b1663924
...
...
@@ -155,7 +155,7 @@ static void printHelp() {
printf
(
"%s%s
\n
"
,
indent
,
"-l"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"run duration unit is minutes, default is "
,
g_stConfInfo
.
runDurationMinutes
);
printf
(
"%s%s%s
%d
\n
"
,
indent
,
indent
,
"run duration unit is minutes, default is "
,
g_stConfInfo
.
runDurationMinutes
);
printf
(
"%s%s
\n
"
,
indent
,
"-p"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"producer thread number, default is 0"
);
printf
(
"%s%s
\n
"
,
indent
,
"-b"
);
...
...
@@ -238,7 +238,7 @@ void saveConfigToLogFile() {
taosFprintfFile
(
g_fp
,
"%s:%s, "
,
g_stConfInfo
.
stThreads
[
i
].
key
[
k
],
g_stConfInfo
.
stThreads
[
i
].
value
[
k
]);
}
taosFprintfFile
(
g_fp
,
"
\n
"
);
taosFprintfFile
(
g_fp
,
" expect rows: %
d
\n
"
,
g_stConfInfo
.
stThreads
[
i
].
expectMsgCnt
);
taosFprintfFile
(
g_fp
,
" expect rows: %
"
PRIx64
"
\n
"
,
g_stConfInfo
.
stThreads
[
i
].
expectMsgCnt
);
}
char
tmpString
[
128
];
...
...
@@ -263,11 +263,11 @@ void parseArgument(int32_t argc, char* argv[]) {
printHelp
();
exit
(
0
);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
dbName
,
argv
[
++
i
]
);
tstrncpy
(
g_stConfInfo
.
dbName
,
argv
[
++
i
],
sizeof
(
g_stConfInfo
.
dbName
)
);
}
else
if
(
strcmp
(
argv
[
i
],
"-w"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
cdbName
,
argv
[
++
i
]
);
tstrncpy
(
g_stConfInfo
.
cdbName
,
argv
[
++
i
],
sizeof
(
g_stConfInfo
.
cdbName
)
);
}
else
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
strcpy
(
configDir
,
argv
[
++
i
]
);
tstrncpy
(
configDir
,
argv
[
++
i
],
PATH_MAX
);
}
else
if
(
strcmp
(
argv
[
i
],
"-g"
)
==
0
)
{
g_stConfInfo
.
showMsgFlag
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-r"
)
==
0
)
{
...
...
@@ -279,9 +279,9 @@ void parseArgument(int32_t argc, char* argv[]) {
}
else
if
(
strcmp
(
argv
[
i
],
"-e"
)
==
0
)
{
g_stConfInfo
.
useSnapshot
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
)
{
char
tmpBuf
[
56
]
;
strcpy
(
tmpBuf
,
argv
[
++
i
]
);
sprintf
(
g_stConfInfo
.
topic
,
"`%s`"
,
tmpBuf
);
char
tmpBuf
[
56
]
=
{
0
}
;
tstrncpy
(
tmpBuf
,
argv
[
++
i
],
sizeof
(
tmpBuf
)
);
sprintf
(
g_stConfInfo
.
topic
,
"`%s`"
,
tmpBuf
);
}
else
if
(
strcmp
(
argv
[
i
],
"-x"
)
==
0
)
{
g_stConfInfo
.
numOfThread
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-l"
)
==
0
)
{
...
...
@@ -294,6 +294,10 @@ void parseArgument(int32_t argc, char* argv[]) {
g_stConfInfo
.
producerRate
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-n"
)
==
0
)
{
g_stConfInfo
.
payloadLen
=
atol
(
argv
[
++
i
]);
if
(
g_stConfInfo
.
payloadLen
<=
0
||
g_stConfInfo
.
payloadLen
>
1024
*
1024
*
1024
){
pError
(
"%s calloc size is too large: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
exit
(
-
1
);
}
}
else
{
pError
(
"%s unknow para: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
exit
(
-
1
);
...
...
@@ -354,8 +358,8 @@ void ltrim(char* str) {
int
queryDB
(
TAOS
*
taos
,
char
*
command
)
{
int
retryCnt
=
10
;
int
code
;
TAOS_RES
*
pRes
;
int
code
=
0
;
TAOS_RES
*
pRes
=
NULL
;
while
(
retryCnt
--
)
{
pRes
=
taos_query
(
taos
,
command
);
...
...
@@ -363,10 +367,11 @@ int queryDB(TAOS* taos, char* command) {
if
(
code
!=
0
)
{
taosSsleep
(
1
);
taos_free_result
(
pRes
);
pRes
=
NULL
;
continue
;
}
taos_free_result
(
pRes
);
return
0
;
return
0
;
}
pError
(
"failed to reason:%s, sql: %s"
,
tstrerror
(
code
),
command
);
...
...
@@ -418,7 +423,7 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
char
sqlStr
[
1100
]
=
{
0
};
if
(
strlen
(
buf
)
>
1024
)
{
taosFprintfFile
(
g_fp
,
"The length of one row[%d] is overflow 1024
\n
"
,
strlen
(
buf
));
taosFprintfFile
(
g_fp
,
"The length of one row[%d] is overflow 1024
\n
"
,
(
int
)
strlen
(
buf
));
taosCloseFile
(
&
g_fp
);
return
-
1
;
}
...
...
@@ -592,7 +597,7 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
int32_t
vgroupId
=
tmq_get_vgroup_id
(
msg
);
const
char
*
dbName
=
tmq_get_db_name
(
msg
);
taosFprintfFile
(
g_fp
,
"consumerId: %d, msg index:%
"
PRId64
"
\n
"
,
pInfo
->
consumerId
,
msgIndex
);
taosFprintfFile
(
g_fp
,
"consumerId: %d, msg index:%
d
\n
"
,
pInfo
->
consumerId
,
msgIndex
);
taosFprintfFile
(
g_fp
,
"dbName: %s, topic: %s, vgroupId: %d
\n
"
,
dbName
!=
NULL
?
dbName
:
"invalid table"
,
tmq_get_topic_name
(
msg
),
vgroupId
);
...
...
@@ -644,7 +649,7 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
int32_t
vgroupId
=
tmq_get_vgroup_id
(
msg
);
const
char
*
dbName
=
tmq_get_db_name
(
msg
);
taosFprintfFile
(
g_fp
,
"consumerId: %d, msg index:%
"
PRId64
"
\n
"
,
pInfo
->
consumerId
,
msgIndex
);
taosFprintfFile
(
g_fp
,
"consumerId: %d, msg index:%
d
\n
"
,
pInfo
->
consumerId
,
msgIndex
);
taosFprintfFile
(
g_fp
,
"dbName: %s, topic: %s, vgroupId: %d
\n
"
,
dbName
!=
NULL
?
dbName
:
"invalid table"
,
tmq_get_topic_name
(
msg
),
vgroupId
);
...
...
@@ -960,7 +965,7 @@ void parseConsumeInfo() {
ltrim
(
pstr
);
char
*
ret
=
strchr
(
pstr
,
ch
);
memcpy
(
g_stConfInfo
.
stThreads
[
i
].
key
[
g_stConfInfo
.
stThreads
[
i
].
numOfKey
],
pstr
,
ret
-
pstr
);
strcpy
(
g_stConfInfo
.
stThreads
[
i
].
value
[
g_stConfInfo
.
stThreads
[
i
].
numOfKey
],
ret
+
1
);
tstrncpy
(
g_stConfInfo
.
stThreads
[
i
].
value
[
g_stConfInfo
.
stThreads
[
i
].
numOfKey
],
ret
+
1
,
sizeof
(
g_stConfInfo
.
stThreads
[
i
].
value
[
g_stConfInfo
.
stThreads
[
i
].
numOfKey
])
);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo
.
stThreads
[
i
].
numOfKey
++
;
...
...
@@ -1268,25 +1273,26 @@ void* ombProduceThreadFunc(void* param) {
for
(
int
i
=
0
;
i
<
batchPerTblTimes
;
++
i
)
{
uint32_t
msgsOfSql
=
g_stConfInfo
.
batchSize
;
if
((
i
==
batchPerTblTimes
-
1
)
&&
(
0
!=
remainder
))
{
msgsOfSql
=
remainder
;
msgsOfSql
=
remainder
;
}
int
len
=
0
;
len
+=
snprintf
(
sqlBuf
+
len
,
MAX_SQL_LEN
-
len
,
"insert into %s values "
,
ctbName
);
for
(
int
j
=
0
;
j
<
msgsOfSql
;
j
++
)
{
int64_t
timeStamp
=
taosGetTimestampNs
();
len
+=
snprintf
(
sqlBuf
+
len
,
MAX_SQL_LEN
-
len
,
"(%"
PRId64
",
\"
%s
\"
)"
,
timeStamp
,
g_payload
);
int64_t
timeStamp
=
taosGetTimestampNs
();
len
+=
snprintf
(
sqlBuf
+
len
,
MAX_SQL_LEN
-
len
,
"(%"
PRId64
",
\"
%s
\"
)"
,
timeStamp
,
g_payload
);
sendMsgs
++
;
pInfo
->
totalProduceMsgs
++
;
}
totalMsgLen
+=
len
;
totalMsgLen
+=
len
;
pInfo
->
totalMsgsLen
+=
len
;
int64_t
affectedRows
=
queryDbExec
(
pInfo
->
taos
,
sqlBuf
,
INSERT_TYPE
);
int64_t
affectedRows
=
queryDbExec
(
pInfo
->
taos
,
sqlBuf
,
INSERT_TYPE
);
if
(
affectedRows
<
0
)
{
taos_close
(
pInfo
->
taos
);
pInfo
->
taos
=
NULL
;
return
NULL
;
pInfo
->
taos
=
NULL
;
taosMemoryFree
(
sqlBuf
);
return
NULL
;
}
affectedRowsTotal
+=
affectedRows
;
...
...
@@ -1322,6 +1328,7 @@ void* ombProduceThreadFunc(void* param) {
printf
(
"affectedRowsTotal: %"
PRId64
"
\n
"
,
affectedRowsTotal
);
taos_close
(
pInfo
->
taos
);
pInfo
->
taos
=
NULL
;
taosMemoryFree
(
sqlBuf
);
return
NULL
;
}
...
...
utils/test/c/tmq_taosx_ci.c
浏览文件 @
b1663924
...
...
@@ -663,7 +663,7 @@ void initLogFile() {
int
main
(
int
argc
,
char
*
argv
[])
{
for
(
int32_t
i
=
1
;
i
<
argc
;
i
++
)
{
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
){
strcpy
(
g_conf
.
dir
,
argv
[
++
i
]
);
tstrncpy
(
g_conf
.
dir
,
argv
[
++
i
],
sizeof
(
g_conf
.
dir
)
);
}
else
if
(
strcmp
(
argv
[
i
],
"-s"
)
==
0
){
g_conf
.
snapShot
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
){
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录