Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9627e810
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9627e810
编写于
4月 18, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(tmq): support seek offset.
上级
97da3405
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
190 addition
and
29 deletion
+190
-29
include/client/taos.h
include/client/taos.h
+8
-0
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+180
-25
source/common/src/tmsg.c
source/common/src/tmsg.c
+2
-4
未找到文件。
include/client/taos.h
浏览文件 @
9627e810
...
...
@@ -262,6 +262,12 @@ DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errst
DLL_EXPORT
const
char
*
tmq_err2str
(
int32_t
code
);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
typedef
struct
tmq_topic_assignment
{
int32_t
vgroupHandle
;
int64_t
currentOffset
;
int64_t
begin
;
int64_t
end
;
}
tmq_topic_assignment
;
DLL_EXPORT
int32_t
tmq_subscribe
(
tmq_t
*
tmq
,
const
tmq_list_t
*
topic_list
);
DLL_EXPORT
int32_t
tmq_unsubscribe
(
tmq_t
*
tmq
);
...
...
@@ -270,6 +276,8 @@ DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT
int32_t
tmq_consumer_close
(
tmq_t
*
tmq
);
DLL_EXPORT
int32_t
tmq_commit_sync
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
);
DLL_EXPORT
void
tmq_commit_async
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
,
tmq_commit_cb
*
cb
,
void
*
param
);
DLL_EXPORT
int32_t
tmq_get_topic_assignment
(
tmq_t
*
tmq
,
const
char
*
pTopicName
,
tmq_topic_assignment
**
assignment
,
int32_t
*
numOfAssignment
);
DLL_EXPORT
int32_t
tmq_offset_seek
(
tmq_t
*
tmq
,
const
char
*
pTopicName
,
int32_t
vgroupHandle
,
int64_t
offset
);
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
...
...
source/client/src/clientTmq.c
浏览文件 @
9627e810
...
...
@@ -133,16 +133,22 @@ enum {
TMQ_DELAYED_TASK__COMMIT
,
};
typedef
struct
{
int64_t
pollCnt
;
int64_t
numOfRows
;
typedef
struct
SVgOffsetInfo
{
STqOffsetVal
committedOffset
;
STqOffsetVal
currentOffset
;
int32_t
vgId
;
int32_t
vgStatus
;
int32_t
vgSkipCnt
;
int64_t
emptyBlockReceiveTs
;
// once empty block is received, idle for ignoreCnt then start to poll data
SEpSet
epSet
;
int64_t
walVerBegin
;
int64_t
walVerEnd
;
}
SVgOffsetInfo
;
typedef
struct
{
int64_t
pollCnt
;
int64_t
numOfRows
;
SVgOffsetInfo
offsetInfo
;
int32_t
vgId
;
int32_t
vgStatus
;
int32_t
vgSkipCnt
;
// here used to mark the slow vgroups
int64_t
emptyBlockReceiveTs
;
// once empty block is received, idle for ignoreCnt then start to poll data
SEpSet
epSet
;
}
SMqClientVg
;
typedef
struct
{
...
...
@@ -441,7 +447,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
// if (code1 != TSDB_CODE_SUCCESS) { // retry failed.
// tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
// " retry failed, ignore this commit. code:%s ordinal:%d/%d",
// pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->committedOffset.version,
// pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->
offsetInfo.
committedOffset.version,
// tstrerror(terrno), index + 1, numOfVgroups);
// }
// }
...
...
@@ -473,7 +479,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pOffset
->
val
=
pVg
->
currentOffset
;
pOffset
->
val
=
pVg
->
offsetInfo
.
currentOffset
;
int32_t
groupLen
=
strlen
(
tmq
->
groupId
);
memcpy
(
pOffset
->
subKey
,
tmq
->
groupId
,
groupLen
);
...
...
@@ -543,7 +549,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
tFormatOffset
(
offsetBuf
,
tListLen
(
offsetBuf
),
&
pOffset
->
val
);
char
commitBuf
[
80
]
=
{
0
};
tFormatOffset
(
commitBuf
,
tListLen
(
commitBuf
),
&
pVg
->
committedOffset
);
tFormatOffset
(
commitBuf
,
tListLen
(
commitBuf
),
&
pVg
->
offsetInfo
.
committedOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%"
PRIx64
,
tmq
->
consumerId
,
pOffset
->
subKey
,
pVg
->
vgId
,
offsetBuf
,
commitBuf
,
pEp
->
fqdn
,
pEp
->
port
,
index
+
1
,
totalVgroups
,
pMsgSendInfo
->
requestId
);
...
...
@@ -632,7 +638,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* p
}
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
if
(
pVg
->
currentOffset
.
type
>
0
&&
!
tOffsetEqual
(
&
pVg
->
currentOffset
,
&
pVg
->
committedOffset
))
{
if
(
pVg
->
offsetInfo
.
currentOffset
.
type
>
0
&&
!
tOffsetEqual
(
&
pVg
->
offsetInfo
.
currentOffset
,
&
pVg
->
offsetInfo
.
committedOffset
))
{
code
=
doSendCommitMsg
(
tmq
,
pVg
,
pTopic
->
topicName
,
pParamSet
,
j
,
numOfVgroups
);
// failed to commit, callback user function directly.
...
...
@@ -673,20 +679,20 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
for
(
int32_t
j
=
0
;
j
<
numOfVgroups
;
j
++
)
{
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
if
(
pVg
->
currentOffset
.
type
>
0
&&
!
tOffsetEqual
(
&
pVg
->
currentOffset
,
&
pVg
->
committedOffset
))
{
if
(
pVg
->
offsetInfo
.
currentOffset
.
type
>
0
&&
!
tOffsetEqual
(
&
pVg
->
offsetInfo
.
currentOffset
,
&
pVg
->
offsetInfo
.
committedOffset
))
{
int32_t
code
=
doSendCommitMsg
(
tmq
,
pVg
,
pTopic
->
topicName
,
pParamSet
,
j
,
numOfVgroups
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"consumer:0x%"
PRIx64
" topic:%s vgId:%d offset:%"
PRId64
" failed, code:%s ordinal:%d/%d"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
pVg
->
committedOffset
.
version
,
tstrerror
(
terrno
),
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
pVg
->
offsetInfo
.
committedOffset
.
version
,
tstrerror
(
terrno
),
j
+
1
,
numOfVgroups
);
continue
;
}
// update the offset value.
pVg
->
committedOffset
=
pVg
->
currentOffset
;
pVg
->
offsetInfo
.
committedOffset
=
pVg
->
offsetInfo
.
currentOffset
;
}
else
{
tscDebug
(
"consumer:0x%"
PRIx64
" topic:%s vgId:%d, no commit, current:%"
PRId64
", ordinal:%d/%d"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
pVg
->
currentOffset
.
version
,
j
+
1
,
numOfVgroups
);
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
pVg
->
offsetInfo
.
currentOffset
.
version
,
j
+
1
,
numOfVgroups
);
}
}
}
...
...
@@ -1398,7 +1404,6 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
SMqClientVg
clientVg
=
{
.
pollCnt
=
0
,
.
currentOffset
=
offsetNew
,
.
vgId
=
pVgEp
->
vgId
,
.
epSet
=
pVgEp
->
epSet
,
.
vgStatus
=
TMQ_VG_STATUS__IDLE
,
...
...
@@ -1407,6 +1412,10 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
.
numOfRows
=
numOfRows
,
};
clientVg
.
offsetInfo
.
currentOffset
=
offsetNew
;
clientVg
.
offsetInfo
.
committedOffset
=
offsetNew
;
clientVg
.
offsetInfo
.
walVerBegin
=
-
1
;
clientVg
.
offsetInfo
.
walVerEnd
=
-
1
;
taosArrayPush
(
pTopic
->
vgs
,
&
clientVg
);
}
}
...
...
@@ -1456,11 +1465,11 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
makeTopicVgroupKey
(
vgKey
,
pTopicCur
->
topicName
,
pVgCur
->
vgId
);
char
buf
[
80
];
tFormatOffset
(
buf
,
80
,
&
pVgCur
->
currentOffset
);
tFormatOffset
(
buf
,
80
,
&
pVgCur
->
offsetInfo
.
currentOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
", epoch:%d vgId:%d vgKey:%s, offset:%s"
,
tmq
->
consumerId
,
epoch
,
pVgCur
->
vgId
,
vgKey
,
buf
);
SVgroupSaveInfo
info
=
{.
offset
=
pVgCur
->
currentOffset
,
.
numOfRows
=
pVgCur
->
numOfRows
};
SVgroupSaveInfo
info
=
{.
offset
=
pVgCur
->
offsetInfo
.
currentOffset
,
.
numOfRows
=
pVgCur
->
numOfRows
};
taosHashPut
(
pVgOffsetHashMap
,
vgKey
,
strlen
(
vgKey
),
&
info
,
sizeof
(
SVgroupSaveInfo
));
}
}
...
...
@@ -1557,7 +1566,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
pReq
->
timeout
=
timeout
;
pReq
->
epoch
=
tmq
->
epoch
;
/*pReq->currentOffset = reqOffset;*/
pReq
->
reqOffset
=
pVg
->
currentOffset
;
pReq
->
reqOffset
=
pVg
->
offsetInfo
.
currentOffset
;
pReq
->
head
.
vgId
=
pVg
->
vgId
;
pReq
->
useSnapshot
=
tmq
->
useSnapshot
;
pReq
->
reqId
=
generateRequestId
();
...
...
@@ -1681,7 +1690,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
int64_t
transporterId
=
0
;
char
offsetFormatBuf
[
80
];
tFormatOffset
(
offsetFormatBuf
,
tListLen
(
offsetFormatBuf
),
&
pVg
->
currentOffset
);
tFormatOffset
(
offsetFormatBuf
,
tListLen
(
offsetFormatBuf
),
&
pVg
->
offsetInfo
.
currentOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
" send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%"
PRIx64
,
pTmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
pTmq
->
epoch
,
offsetFormatBuf
,
req
.
reqId
);
...
...
@@ -1798,7 +1807,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
// update the local offset value only for the returned values.
pVg
->
currentOffset
=
pDataRsp
->
rspOffset
;
pVg
->
offsetInfo
.
currentOffset
=
pDataRsp
->
rspOffset
;
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
char
buf
[
80
];
...
...
@@ -1835,7 +1844,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if
(
pollRspWrapper
->
metaRsp
.
head
.
epoch
==
consumerEpoch
)
{
SMqClientVg
*
pVg
=
pollRspWrapper
->
vgHandle
;
pVg
->
currentOffset
=
pollRspWrapper
->
metaRsp
.
rspOffset
;
pVg
->
offsetInfo
.
currentOffset
=
pollRspWrapper
->
metaRsp
.
rspOffset
;
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
// build rsp
SMqMetaRspObj
*
pRsp
=
tmqBuildMetaRspFromWrapper
(
pollRspWrapper
);
...
...
@@ -1853,7 +1862,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if
(
pollRspWrapper
->
taosxRsp
.
head
.
epoch
==
consumerEpoch
)
{
SMqClientVg
*
pVg
=
pollRspWrapper
->
vgHandle
;
pVg
->
currentOffset
=
pollRspWrapper
->
taosxRsp
.
rspOffset
;
pVg
->
offsetInfo
.
currentOffset
=
pollRspWrapper
->
taosxRsp
.
rspOffset
;
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
if
(
pollRspWrapper
->
taosxRsp
.
blockNum
==
0
)
{
...
...
@@ -1879,7 +1888,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tmq
->
totalRows
+=
numOfRows
;
char
buf
[
80
];
tFormatOffset
(
buf
,
80
,
&
pVg
->
currentOffset
);
tFormatOffset
(
buf
,
80
,
&
pVg
->
offsetInfo
.
currentOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
" process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"
PRId64
", vg total:%"
PRId64
" total:%"
PRId64
" reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
pVg
->
vgId
,
buf
,
pollRspWrapper
->
dataRsp
.
blockNum
,
numOfRows
,
pVg
->
numOfRows
,
...
...
@@ -2323,4 +2332,150 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
}
return
NULL
;
}
static
SMqClientTopic
*
getTopicByName
(
tmq_t
*
tmq
,
const
char
*
pTopicName
)
{
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
for
(
int32_t
i
=
0
;
i
<
numOfTopics
;
++
i
)
{
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
if
(
strcmp
(
pTopic
->
topicName
,
pTopicName
)
!=
0
)
{
continue
;
}
return
pTopic
;
}
tscError
(
"consumer:0x%"
PRIx64
", failed to find topic:%s"
,
tmq
->
consumerId
,
pTopicName
);
return
NULL
;
}
int32_t
tmq_get_topic_assignment
(
tmq_t
*
tmq
,
const
char
*
pTopicName
,
tmq_topic_assignment
**
assignment
,
int32_t
*
numOfAssignment
)
{
*
numOfAssignment
=
0
;
*
assignment
=
NULL
;
SMqClientTopic
*
pTopic
=
getTopicByName
(
tmq
,
pTopicName
);
if
(
pTopic
==
NULL
)
{
return
TSDB_CODE_INVALID_PARA
;
}
// in case of snapshot is opened, no valid offset will return
*
numOfAssignment
=
taosArrayGetSize
(
pTopic
->
vgs
);
for
(
int32_t
j
=
0
;
j
<
(
*
numOfAssignment
);
++
j
)
{
SMqClientVg
*
pClientVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
tmq_topic_assignment
*
pAssignment
=
&
(
*
assignment
)[
j
];
if
(
pClientVg
->
offsetInfo
.
currentOffset
.
type
==
TMQ_OFFSET__LOG
)
{
pAssignment
->
currentOffset
=
pClientVg
->
offsetInfo
.
currentOffset
.
version
;
}
else
{
pAssignment
->
currentOffset
=
0
;
}
pAssignment
->
begin
=
pClientVg
->
offsetInfo
.
walVerBegin
;
pAssignment
->
end
=
pClientVg
->
offsetInfo
.
walVerEnd
;
pAssignment
->
vgroupHandle
=
pClientVg
->
vgId
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
tmq_offset_seek
(
tmq_t
*
tmq
,
const
char
*
pTopicName
,
int32_t
vgroupHandle
,
int64_t
offset
)
{
if
(
tmq
==
NULL
)
{
return
TSDB_CODE_INVALID_PARA
;
}
SMqClientTopic
*
pTopic
=
getTopicByName
(
tmq
,
pTopicName
);
if
(
pTopic
==
NULL
)
{
return
TSDB_CODE_INVALID_PARA
;
}
SMqClientVg
*
pVg
=
NULL
;
int32_t
numOfVgs
=
taosArrayGetSize
(
pTopic
->
vgs
);
for
(
int32_t
i
=
0
;
i
<
numOfVgs
;
++
i
)
{
SMqClientVg
*
pClientVg
=
taosArrayGet
(
pTopic
->
vgs
,
i
);
if
(
pClientVg
->
vgId
==
vgroupHandle
)
{
pVg
=
pClientVg
;
break
;
}
}
if
(
pVg
==
NULL
)
{
return
TSDB_CODE_INVALID_PARA
;
}
if
(
offset
<
pVg
->
offsetInfo
.
walVerBegin
||
offset
>
pVg
->
offsetInfo
.
walVerEnd
)
{
return
TSDB_CODE_INVALID_PARA
;
}
return
0
;
#if 0
// tmq_commit_sync(tmq, );
{
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
// pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
return -1;
}
pParamSet->refId = tmq->refId;
pParamSet->epoch = tmq->epoch;
pParamSet->callbackFn = pCommitFp;
pParamSet->userParam = userParam;
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
int32_t i = 0;
for (; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (strcmp(pTopic->topicName, pTopicName) == 0) {
break;
}
}
if (i == numOfTopics) {
tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId,
pTopicName, numOfTopics);
taosMemoryFree(pParamSet);
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
return;
}
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
int32_t j = 0;
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->vgId == vgId) {
break;
}
}
if (j == numOfVgroups) {
tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId,
vgId, numOfVgroups, pTopicName);
taosMemoryFree(pParamSet);
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
return;
}
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
// failed to commit, callback user function directly.
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam);
}
} else { // do not perform commit, callback user function directly.
taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam);
}
}
#endif
}
\ No newline at end of file
source/common/src/tmsg.c
浏览文件 @
9627e810
...
...
@@ -7158,8 +7158,7 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
}
void
tDeleteSTaosxRsp
(
STaosxRsp
*
pRsp
)
{
taosArrayDestroy
(
pRsp
->
blockDataLen
);
pRsp
->
blockDataLen
=
NULL
;
pRsp
->
blockDataLen
=
taosArrayDestroy
(
pRsp
->
blockDataLen
);
taosArrayDestroyP
(
pRsp
->
blockData
,
(
FDelete
)
taosMemoryFree
);
pRsp
->
blockData
=
NULL
;
taosArrayDestroyP
(
pRsp
->
blockSchema
,
(
FDelete
)
tDeleteSSchemaWrapper
);
...
...
@@ -7167,8 +7166,7 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) {
taosArrayDestroyP
(
pRsp
->
blockTbName
,
(
FDelete
)
taosMemoryFree
);
pRsp
->
blockTbName
=
NULL
;
taosArrayDestroy
(
pRsp
->
createTableLen
);
pRsp
->
createTableLen
=
NULL
;
pRsp
->
createTableLen
=
taosArrayDestroy
(
pRsp
->
createTableLen
);
taosArrayDestroyP
(
pRsp
->
createTableReq
,
(
FDelete
)
taosMemoryFree
);
pRsp
->
createTableReq
=
NULL
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录