Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
864111d8
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
864111d8
编写于
10月 19, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into fix/commit_txn
上级
74d025c7
b621f04e
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
58 addition
and
174 deletion
+58
-174
cmake/taostools_CMakeLists.txt.in
cmake/taostools_CMakeLists.txt.in
+1
-1
source/client/src/clientSml.c
source/client/src/clientSml.c
+8
-2
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+28
-23
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+3
-3
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+2
-2
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+1
-1
source/dnode/vnode/src/tq/tqSink.c
source/dnode/vnode/src/tq/tqSink.c
+2
-2
utils/test/c/sml_test.c
utils/test/c/sml_test.c
+7
-134
utils/test/c/tmqDemo.c
utils/test/c/tmqDemo.c
+5
-5
utils/test/c/tmqSim.c
utils/test/c/tmqSim.c
+1
-1
未找到文件。
cmake/taostools_CMakeLists.txt.in
浏览文件 @
864111d8
...
@@ -2,7 +2,7 @@
...
@@ -2,7 +2,7 @@
# taos-tools
# taos-tools
ExternalProject_Add(taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG
c64858f
GIT_TAG
9284147
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
#BUILD_IN_SOURCE TRUE
...
...
source/client/src/clientSml.c
浏览文件 @
864111d8
...
@@ -1372,8 +1372,14 @@ static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) {
...
@@ -1372,8 +1372,14 @@ static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) {
static
int32_t
smlKvTimeHashCompare
(
const
void
*
key1
,
const
void
*
key2
)
{
static
int32_t
smlKvTimeHashCompare
(
const
void
*
key1
,
const
void
*
key2
)
{
SHashObj
*
s1
=
*
(
SHashObj
**
)
key1
;
SHashObj
*
s1
=
*
(
SHashObj
**
)
key1
;
SHashObj
*
s2
=
*
(
SHashObj
**
)
key2
;
SHashObj
*
s2
=
*
(
SHashObj
**
)
key2
;
SSmlKv
*
kv1
=
*
(
SSmlKv
**
)
taosHashGet
(
s1
,
TS
,
TS_LEN
);
SSmlKv
**
kv1pp
=
(
SSmlKv
**
)
taosHashGet
(
s1
,
TS
,
TS_LEN
);
SSmlKv
*
kv2
=
*
(
SSmlKv
**
)
taosHashGet
(
s2
,
TS
,
TS_LEN
);
SSmlKv
**
kv2pp
=
(
SSmlKv
**
)
taosHashGet
(
s2
,
TS
,
TS_LEN
);
if
(
!
kv1pp
||
!
kv2pp
){
uError
(
"smlKvTimeHashCompare kv is null"
);
return
-
1
;
}
SSmlKv
*
kv1
=
*
kv1pp
;
SSmlKv
*
kv2
=
*
kv2pp
;
if
(
!
kv1
||
kv1
->
type
!=
TSDB_DATA_TYPE_TIMESTAMP
){
if
(
!
kv1
||
kv1
->
type
!=
TSDB_DATA_TYPE_TIMESTAMP
){
uError
(
"smlKvTimeHashCompare kv1"
);
uError
(
"smlKvTimeHashCompare kv1"
);
return
-
1
;
return
-
1
;
...
...
source/client/src/clientTmq.c
浏览文件 @
864111d8
...
@@ -210,6 +210,8 @@ typedef struct {
...
@@ -210,6 +210,8 @@ typedef struct {
typedef
struct
{
typedef
struct
{
SMqCommitCbParamSet
*
params
;
SMqCommitCbParamSet
*
params
;
STqOffset
*
pOffset
;
STqOffset
*
pOffset
;
/*char topicName[TSDB_TOPIC_FNAME_LEN];*/
/*int32_t vgId;*/
}
SMqCommitCbParam
;
}
SMqCommitCbParam
;
tmq_conf_t
*
tmq_conf_new
()
{
tmq_conf_t
*
tmq_conf_new
()
{
...
@@ -407,6 +409,14 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
...
@@ -407,6 +409,14 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
return
0
;
return
0
;
}
}
static
void
tmqCommitRspCountDown
(
SMqCommitCbParamSet
*
pParamSet
)
{
int32_t
waitingRspNum
=
atomic_sub_fetch_32
(
&
pParamSet
->
waitingRspNum
,
1
);
ASSERT
(
waitingRspNum
>=
0
);
if
(
waitingRspNum
==
0
)
{
tmqCommitDone
(
pParamSet
);
}
}
int32_t
tmqCommitCb
(
void
*
param
,
SDataBuf
*
pBuf
,
int32_t
code
)
{
int32_t
tmqCommitCb
(
void
*
param
,
SDataBuf
*
pBuf
,
int32_t
code
)
{
SMqCommitCbParam
*
pParam
=
(
SMqCommitCbParam
*
)
param
;
SMqCommitCbParam
*
pParam
=
(
SMqCommitCbParam
*
)
param
;
SMqCommitCbParamSet
*
pParamSet
=
(
SMqCommitCbParamSet
*
)
pParam
->
params
;
SMqCommitCbParamSet
*
pParamSet
=
(
SMqCommitCbParamSet
*
)
pParam
->
params
;
...
@@ -420,18 +430,13 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
...
@@ -420,18 +430,13 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
#endif
#endif
taosMemoryFree
(
pParam
->
pOffset
);
taosMemoryFree
(
pParam
->
pOffset
);
if
(
pBuf
->
pData
)
taosMemoryFree
(
pBuf
->
pData
);
taosMemoryFree
(
pBuf
->
pData
);
/*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
/*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
* pOffset->version);*/
* pOffset->version);*/
// count down waiting rsp
tmqCommitRspCountDown
(
pParamSet
);
int32_t
waitingRspNum
=
atomic_sub_fetch_32
(
&
pParamSet
->
waitingRspNum
,
1
);
ASSERT
(
waitingRspNum
>=
0
);
if
(
waitingRspNum
==
0
)
{
tmqCommitDone
(
pParamSet
);
}
return
0
;
return
0
;
}
}
...
@@ -591,14 +596,10 @@ FAIL:
...
@@ -591,14 +596,10 @@ FAIL:
return
0
;
return
0
;
}
}
int32_t
tmqCommitInner
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
,
int8_t
automatic
,
int8_t
async
,
tmq_commit_cb
*
userCb
,
static
int32_t
tmqCommitConsumerImpl
(
tmq_t
*
tmq
,
int8_t
automatic
,
int8_t
async
,
tmq_commit_cb
*
userCb
,
void
*
userParam
)
{
void
*
userParam
)
{
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
if
(
msg
!=
NULL
)
{
return
tmqCommitMsgImpl
(
tmq
,
msg
,
async
,
userCb
,
userParam
);
}
SMqCommitCbParamSet
*
pParamSet
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqCommitCbParamSet
));
SMqCommitCbParamSet
*
pParamSet
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqCommitCbParamSet
));
if
(
pParamSet
==
NULL
)
{
if
(
pParamSet
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
@@ -646,33 +647,37 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
...
@@ -646,33 +647,37 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
}
}
}
}
// no request is sent
if
(
pParamSet
->
totalRspNum
==
0
)
{
if
(
pParamSet
->
totalRspNum
==
0
)
{
tsem_destroy
(
&
pParamSet
->
rspSem
);
tsem_destroy
(
&
pParamSet
->
rspSem
);
taosMemoryFree
(
pParamSet
);
taosMemoryFree
(
pParamSet
);
return
0
;
return
0
;
}
}
int32_t
waitingRspNum
=
atomic_sub_fetch_32
(
&
pParamSet
->
waitingRspNum
,
1
);
// count down since waiting rsp num init as 1
ASSERT
(
waitingRspNum
>=
0
);
tmqCommitRspCountDown
(
pParamSet
);
if
(
waitingRspNum
==
0
)
{
tmqCommitDone
(
pParamSet
);
}
if
(
!
async
)
{
if
(
!
async
)
{
tsem_wait
(
&
pParamSet
->
rspSem
);
tsem_wait
(
&
pParamSet
->
rspSem
);
code
=
pParamSet
->
rspErr
;
code
=
pParamSet
->
rspErr
;
tsem_destroy
(
&
pParamSet
->
rspSem
);
tsem_destroy
(
&
pParamSet
->
rspSem
);
taosMemoryFree
(
pParamSet
);
taosMemoryFree
(
pParamSet
);
}
#if 0
#if 0
if (!async) {
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
}
#endif
#endif
}
return
0
;
return
code
;
}
int32_t
tmqCommitInner
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
,
int8_t
automatic
,
int8_t
async
,
tmq_commit_cb
*
userCb
,
void
*
userParam
)
{
if
(
msg
)
{
return
tmqCommitMsgImpl
(
tmq
,
msg
,
async
,
userCb
,
userParam
);
}
else
{
return
tmqCommitConsumerImpl
(
tmq
,
automatic
,
async
,
userCb
,
userParam
);
}
}
}
void
tmqAssignAskEpTask
(
void
*
param
,
void
*
tmrId
)
{
void
tmqAssignAskEpTask
(
void
*
param
,
void
*
tmrId
)
{
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
864111d8
...
@@ -149,7 +149,7 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
...
@@ -149,7 +149,7 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
int32_t
tDecodeSTqHandle
(
SDecoder
*
pDecoder
,
STqHandle
*
pHandle
);
int32_t
tDecodeSTqHandle
(
SDecoder
*
pDecoder
,
STqHandle
*
pHandle
);
// tqRead
// tqRead
int32_t
tqScan
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
STaosxRsp
*
pRsp
,
SMqMetaRsp
*
pMetaRsp
,
STqOffsetVal
*
offset
);
int32_t
tqScan
Taosx
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
STaosxRsp
*
pRsp
,
SMqMetaRsp
*
pMetaRsp
,
STqOffsetVal
*
offset
);
int32_t
tqScanData
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
pOffset
);
int32_t
tqScanData
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
pOffset
);
int64_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalCkHead
**
pHeadWithCkSum
);
int64_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalCkHead
**
pHeadWithCkSum
);
...
@@ -181,8 +181,8 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey)
...
@@ -181,8 +181,8 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey)
int32_t
tqOffsetCommitFile
(
STqOffsetStore
*
pStore
);
int32_t
tqOffsetCommitFile
(
STqOffsetStore
*
pStore
);
// tqSink
// tqSink
void
tq
TableSink
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
);
void
tq
SinkToTableMerge
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
);
void
tq
TableSink1
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
);
void
tq
SinkToTablePipeline
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
);
// tqOffset
// tqOffset
char
*
tqOffsetBuildFName
(
const
char
*
path
,
int32_t
ver
);
char
*
tqOffsetBuildFName
(
const
char
*
path
,
int32_t
ver
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
864111d8
...
@@ -595,7 +595,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
...
@@ -595,7 +595,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqInitTaosxRsp
(
&
taosxRsp
,
pReq
);
tqInitTaosxRsp
(
&
taosxRsp
,
pReq
);
if
(
fetchOffsetNew
.
type
!=
TMQ_OFFSET__LOG
)
{
if
(
fetchOffsetNew
.
type
!=
TMQ_OFFSET__LOG
)
{
tqScan
(
pTq
,
pHandle
,
&
taosxRsp
,
&
metaRsp
,
&
fetchOffsetNew
);
tqScan
Taosx
(
pTq
,
pHandle
,
&
taosxRsp
,
&
metaRsp
,
&
fetchOffsetNew
);
if
(
metaRsp
.
metaRspLen
>
0
)
{
if
(
metaRsp
.
metaRspLen
>
0
)
{
if
(
tqSendMetaPollRsp
(
pTq
,
pMsg
,
pReq
,
&
metaRsp
)
<
0
)
{
if
(
tqSendMetaPollRsp
(
pTq
,
pMsg
,
pReq
,
&
metaRsp
)
<
0
)
{
...
@@ -924,7 +924,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
...
@@ -924,7 +924,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
pTask
->
smaSink
.
smaSink
=
smaHandleRes
;
pTask
->
smaSink
.
smaSink
=
smaHandleRes
;
}
else
if
(
pTask
->
outputType
==
TASK_OUTPUT__TABLE
)
{
}
else
if
(
pTask
->
outputType
==
TASK_OUTPUT__TABLE
)
{
pTask
->
tbSink
.
vnode
=
pTq
->
pVnode
;
pTask
->
tbSink
.
vnode
=
pTq
->
pVnode
;
pTask
->
tbSink
.
tbSinkFunc
=
tq
TableSink1
;
pTask
->
tbSink
.
tbSinkFunc
=
tq
SinkToTablePipeline
;
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
);
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
);
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
);
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
);
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
864111d8
...
@@ -123,7 +123,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
...
@@ -123,7 +123,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
return
0
;
return
0
;
}
}
int32_t
tqScan
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
STaosxRsp
*
pRsp
,
SMqMetaRsp
*
pMetaRsp
,
STqOffsetVal
*
pOffset
)
{
int32_t
tqScan
Taosx
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
STaosxRsp
*
pRsp
,
SMqMetaRsp
*
pMetaRsp
,
STqOffsetVal
*
pOffset
)
{
const
STqExecHandle
*
pExec
=
&
pHandle
->
execHandle
;
const
STqExecHandle
*
pExec
=
&
pHandle
->
execHandle
;
qTaskInfo_t
task
=
pExec
->
task
;
qTaskInfo_t
task
=
pExec
->
task
;
...
...
source/dnode/vnode/src/tq/tqSink.c
浏览文件 @
864111d8
...
@@ -284,7 +284,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
...
@@ -284,7 +284,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
return
ret
;
return
ret
;
}
}
void
tq
TableSink1
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
)
{
void
tq
SinkToTablePipeline
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
)
{
const
SArray
*
pBlocks
=
(
const
SArray
*
)
data
;
const
SArray
*
pBlocks
=
(
const
SArray
*
)
data
;
SVnode
*
pVnode
=
(
SVnode
*
)
vnode
;
SVnode
*
pVnode
=
(
SVnode
*
)
vnode
;
int64_t
suid
=
pTask
->
tbSink
.
stbUid
;
int64_t
suid
=
pTask
->
tbSink
.
stbUid
;
...
@@ -528,7 +528,7 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
...
@@ -528,7 +528,7 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
taosArrayDestroy
(
tagArray
);
taosArrayDestroy
(
tagArray
);
}
}
void
tq
TableSink
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
)
{
void
tq
SinkToTableMerge
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
)
{
const
SArray
*
pRes
=
(
const
SArray
*
)
data
;
const
SArray
*
pRes
=
(
const
SArray
*
)
data
;
SVnode
*
pVnode
=
(
SVnode
*
)
vnode
;
SVnode
*
pVnode
=
(
SVnode
*
)
vnode
;
SBatchDeleteReq
deleteReq
=
{
0
};
SBatchDeleteReq
deleteReq
=
{
0
};
...
...
utils/test/c/sml_test.c
浏览文件 @
864111d8
...
@@ -102,27 +102,8 @@ int smlProcess_json1_Test() {
...
@@ -102,27 +102,8 @@ int smlProcess_json1_Test() {
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
const
char
*
sql
[]
=
{
const
char
*
sql
[]
=
{
"["
"[{
\"
metric
\"
:
\"
sys.cpu.nice
\"
,
\"
timestamp
\"
:0,
\"
value
\"
:18,
\"
tags
\"
:{
\"
host
\"
:
\"
web01
\"
,
\"
id
\"
:
\"
t1
\"
,
\"
dc
\"
:
\"
lga
\"
}},{
\"
metric
\"
:
\"
sys.cpu.nice
\"
,
\"
timestamp
\"
:1662344042,
\"
value
\"
:9,
\"
tags
\"
:{
\"
host
\"
:
\"
web02
\"
,
\"
dc
\"
:
\"
lga
\"
}}]"
" {"
};
"
\"
metric
\"
:
\"
sys.cpu.nice
\"
,"
"
\"
timestamp
\"
: 0,"
"
\"
value
\"
: 18,"
"
\"
tags
\"
: {"
"
\"
host
\"
:
\"
web01
\"
,"
"
\"
id
\"
:
\"
t1
\"
,"
"
\"
dc
\"
:
\"
lga
\"
"
" }"
" },"
" {"
"
\"
metric
\"
:
\"
sys.cpu.nice
\"
,"
"
\"
timestamp
\"
: 1662344042,"
"
\"
value
\"
: 9,"
"
\"
tags
\"
: {"
"
\"
host
\"
:
\"
web02
\"
,"
"
\"
dc
\"
:
\"
lga
\"
"
" }"
" }"
"]"
,};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
@@ -143,28 +124,8 @@ int smlProcess_json2_Test() {
...
@@ -143,28 +124,8 @@ int smlProcess_json2_Test() {
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
const
char
*
sql
[]
=
{
const
char
*
sql
[]
=
{
"{"
"{
\"
metric
\"
:
\"
meter_current0
\"
,
\"
timestamp
\"
:{
\"
value
\"
:1662344042,
\"
type
\"
:
\"
s
\"
},
\"
value
\"
:{
\"
value
\"
:10.3,
\"
type
\"
:
\"
i64
\"
},
\"
tags
\"
:{
\"
groupid
\"
:{
\"
value
\"
:2,
\"
type
\"
:
\"
bigint
\"
},
\"
location
\"
:{
\"
value
\"
:
\"
北京
\"
,
\"
type
\"
:
\"
binary
\"
},
\"
id
\"
:
\"
d1001
\"
}}"
"
\"
metric
\"
:
\"
meter_current0
\"
,"
};
"
\"
timestamp
\"
: {"
"
\"
value
\"
: 1662344042,"
"
\"
type
\"
:
\"
s
\"
"
" },"
"
\"
value
\"
: {"
"
\"
value
\"
: 10.3,"
"
\"
type
\"
:
\"
i64
\"
"
" },"
"
\"
tags
\"
: {"
"
\"
groupid
\"
: { "
"
\"
value
\"
: 2,"
"
\"
type
\"
:
\"
bigint
\"
"
" },"
"
\"
location
\"
: { "
"
\"
value
\"
:
\"
北京
\"
,"
"
\"
type
\"
:
\"
binary
\"
"
" },"
"
\"
id
\"
:
\"
d1001
\"
"
" }"
"}"
,};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
@@ -185,56 +146,7 @@ int smlProcess_json3_Test() {
...
@@ -185,56 +146,7 @@ int smlProcess_json3_Test() {
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
const
char
*
sql
[]
=
{
const
char
*
sql
[]
=
{
"{"
"{
\"
metric
\"
:
\"
meter_current1
\"
,
\"
timestamp
\"
:{
\"
value
\"
:1662344042,
\"
type
\"
:
\"
s
\"
},
\"
value
\"
:{
\"
value
\"
:10.3,
\"
type
\"
:
\"
i64
\"
},
\"
tags
\"
:{
\"
t1
\"
:{
\"
value
\"
:2,
\"
type
\"
:
\"
bigint
\"
},
\"
t2
\"
:{
\"
value
\"
:2,
\"
type
\"
:
\"
int
\"
},
\"
t3
\"
:{
\"
value
\"
:2,
\"
type
\"
:
\"
i16
\"
},
\"
t4
\"
:{
\"
value
\"
:2,
\"
type
\"
:
\"
i8
\"
},
\"
t5
\"
:{
\"
value
\"
:2,
\"
type
\"
:
\"
f32
\"
},
\"
t6
\"
:{
\"
value
\"
:2,
\"
type
\"
:
\"
double
\"
},
\"
t7
\"
:{
\"
value
\"
:
\"
8323
\"
,
\"
type
\"
:
\"
binary
\"
},
\"
t8
\"
:{
\"
value
\"
:
\"
北京
\"
,
\"
type
\"
:
\"
nchar
\"
},
\"
t9
\"
:{
\"
value
\"
:true,
\"
type
\"
:
\"
bool
\"
},
\"
id
\"
:
\"
d1001
\"
}}"
};
"
\"
metric
\"
:
\"
meter_current1
\"
,"
"
\"
timestamp
\"
: {"
"
\"
value
\"
: 1662344042,"
"
\"
type
\"
:
\"
s
\"
"
" },"
"
\"
value
\"
: {"
"
\"
value
\"
: 10.3,"
"
\"
type
\"
:
\"
i64
\"
"
" },"
"
\"
tags
\"
: {"
"
\"
t1
\"
: { "
"
\"
value
\"
: 2,"
"
\"
type
\"
:
\"
bigint
\"
"
" },"
"
\"
t2
\"
: { "
"
\"
value
\"
: 2,"
"
\"
type
\"
:
\"
int
\"
"
" },"
"
\"
t3
\"
: { "
"
\"
value
\"
: 2,"
"
\"
type
\"
:
\"
i16
\"
"
" },"
"
\"
t4
\"
: { "
"
\"
value
\"
: 2,"
"
\"
type
\"
:
\"
i8
\"
"
" },"
"
\"
t5
\"
: { "
"
\"
value
\"
: 2,"
"
\"
type
\"
:
\"
f32
\"
"
" },"
"
\"
t6
\"
: { "
"
\"
value
\"
: 2,"
"
\"
type
\"
:
\"
double
\"
"
" },"
"
\"
t7
\"
: { "
"
\"
value
\"
:
\"
8323
\"
,"
"
\"
type
\"
:
\"
binary
\"
"
" },"
"
\"
t8
\"
: { "
"
\"
value
\"
:
\"
北京
\"
,"
"
\"
type
\"
:
\"
nchar
\"
"
" },"
"
\"
t9
\"
: { "
"
\"
value
\"
: true,"
"
\"
type
\"
:
\"
bool
\"
"
" },"
"
\"
id
\"
:
\"
d1001
\"
"
" }"
"}"
,};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
@@ -255,47 +167,8 @@ int smlProcess_json4_Test() {
...
@@ -255,47 +167,8 @@ int smlProcess_json4_Test() {
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
const
char
*
sql
[]
=
{
const
char
*
sql
[]
=
{
"{"
"{
\"
metric
\"
:
\"
meter_current2
\"
,
\"
timestamp
\"
:{
\"
value
\"
:1662344042000,
\"
type
\"
:
\"
ms
\"
},
\"
value
\"
:
\"
ni
\"
,
\"
tags
\"
:{
\"
t1
\"
:{
\"
value
\"
:20,
\"
type
\"
:
\"
i64
\"
},
\"
t2
\"
:{
\"
value
\"
:25,
\"
type
\"
:
\"
i32
\"
},
\"
t3
\"
:{
\"
value
\"
:2,
\"
type
\"
:
\"
smallint
\"
},
\"
t4
\"
:{
\"
value
\"
:2,
\"
type
\"
:
\"
tinyint
\"
},
\"
t5
\"
:{
\"
value
\"
:2,
\"
type
\"
:
\"
float
\"
},
\"
t6
\"
:{
\"
value
\"
:0.2,
\"
type
\"
:
\"
f64
\"
},
\"
t7
\"
:
\"
nsj
\"
,
\"
t8
\"
:{
\"
value
\"
:
\"
北京
\"
,
\"
type
\"
:
\"
nchar
\"
},
\"
t9
\"
:false,
\"
id
\"
:
\"
d1001
\"
}}"
"
\"
metric
\"
:
\"
meter_current2
\"
,"
};
"
\"
timestamp
\"
: {"
"
\"
value
\"
: 1662344042000,"
"
\"
type
\"
:
\"
ms
\"
"
" },"
"
\"
value
\"
:
\"
ni
\"
,"
"
\"
tags
\"
: {"
"
\"
t1
\"
: { "
"
\"
value
\"
: 20,"
"
\"
type
\"
:
\"
i64
\"
"
" },"
"
\"
t2
\"
: { "
"
\"
value
\"
: 25,"
"
\"
type
\"
:
\"
i32
\"
"
" },"
"
\"
t3
\"
: { "
"
\"
value
\"
: 2,"
"
\"
type
\"
:
\"
smallint
\"
"
" },"
"
\"
t4
\"
: { "
"
\"
value
\"
: 2,"
"
\"
type
\"
:
\"
tinyint
\"
"
" },"
"
\"
t5
\"
: { "
"
\"
value
\"
: 2,"
"
\"
type
\"
:
\"
float
\"
"
" },"
"
\"
t6
\"
: { "
"
\"
value
\"
: 0.2,"
"
\"
type
\"
:
\"
f64
\"
"
" },"
"
\"
t7
\"
:
\"
nsj
\"
,"
"
\"
t8
\"
: { "
"
\"
value
\"
:
\"
北京
\"
,"
"
\"
type
\"
:
\"
nchar
\"
"
" },"
"
\"
t9
\"
: false,"
"
\"
id
\"
:
\"
d1001
\"
"
" }"
"}"
,};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
...
utils/test/c/tmqDemo.c
浏览文件 @
864111d8
...
@@ -130,15 +130,15 @@ void parseArgument(int32_t argc, char* argv[]) {
...
@@ -130,15 +130,15 @@ void parseArgument(int32_t argc, char* argv[]) {
printHelp
();
printHelp
();
exit
(
0
);
exit
(
0
);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
dbName
,
argv
[
++
i
]
);
tstrncpy
(
g_stConfInfo
.
dbName
,
argv
[
++
i
],
sizeof
(
g_stConfInfo
.
dbName
)
);
}
else
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
strcpy
(
configDir
,
argv
[
++
i
]
);
tstrncpy
(
configDir
,
argv
[
++
i
],
PATH_MAX
);
}
else
if
(
strcmp
(
argv
[
i
],
"-s"
)
==
0
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-s"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
stbName
,
argv
[
++
i
]
);
tstrncpy
(
g_stConfInfo
.
stbName
,
argv
[
++
i
],
sizeof
(
g_stConfInfo
.
stbName
)
);
}
else
if
(
strcmp
(
argv
[
i
],
"-w"
)
==
0
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-w"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
vnodeWalPath
,
argv
[
++
i
]
);
tstrncpy
(
g_stConfInfo
.
vnodeWalPath
,
argv
[
++
i
],
sizeof
(
g_stConfInfo
.
vnodeWalPath
)
);
}
else
if
(
strcmp
(
argv
[
i
],
"-f"
)
==
0
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-f"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
resultFileName
,
argv
[
++
i
]
);
tstrncpy
(
g_stConfInfo
.
resultFileName
,
argv
[
++
i
],
sizeof
(
g_stConfInfo
.
resultFileName
)
);
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
)
{
g_stConfInfo
.
numOfThreads
=
atoi
(
argv
[
++
i
]);
g_stConfInfo
.
numOfThreads
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-n"
)
==
0
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-n"
)
==
0
)
{
...
...
utils/test/c/tmqSim.c
浏览文件 @
864111d8
...
@@ -949,7 +949,7 @@ void parseConsumeInfo() {
...
@@ -949,7 +949,7 @@ void parseConsumeInfo() {
token
=
strtok
(
g_stConfInfo
.
stThreads
[
i
].
topicString
,
delim
);
token
=
strtok
(
g_stConfInfo
.
stThreads
[
i
].
topicString
,
delim
);
while
(
token
!=
NULL
)
{
while
(
token
!=
NULL
)
{
// printf("%s\n", token );
// printf("%s\n", token );
strcpy
(
g_stConfInfo
.
stThreads
[
i
].
topics
[
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
],
token
);
tstrncpy
(
g_stConfInfo
.
stThreads
[
i
].
topics
[
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
],
token
,
sizeof
(
g_stConfInfo
.
stThreads
[
i
].
topics
[
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
])
);
ltrim
(
g_stConfInfo
.
stThreads
[
i
].
topics
[
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
]);
ltrim
(
g_stConfInfo
.
stThreads
[
i
].
topics
[
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
++
;
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
++
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录