Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
02c6af78
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
02c6af78
编写于
6月 23, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into dvvv
上级
58836d2d
89fe5df6
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
54 addition
and
39 deletion
+54
-39
examples/c/tmq.c
examples/c/tmq.c
+11
-3
include/client/taos.h
include/client/taos.h
+1
-1
source/client/src/clientMain.c
source/client/src/clientMain.c
+2
-0
source/client/src/tmq.c
source/client/src/tmq.c
+13
-15
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+6
-3
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+18
-15
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+2
-1
未找到文件。
examples/c/tmq.c
浏览文件 @
02c6af78
...
...
@@ -26,6 +26,14 @@ static void msg_process(TAOS_RES* msg) {
printf
(
"topic: %s
\n
"
,
tmq_get_topic_name
(
msg
));
printf
(
"db: %s
\n
"
,
tmq_get_db_name
(
msg
));
printf
(
"vg: %d
\n
"
,
tmq_get_vgroup_id
(
msg
));
if
(
tmq_get_res_type
(
msg
)
==
TMQ_RES_TABLE_META
)
{
void
*
meta
;
int32_t
metaLen
;
tmq_get_raw_meta
(
msg
,
&
meta
,
&
metaLen
);
printf
(
"meta, len is %d
\n
"
,
metaLen
);
return
;
}
while
(
1
)
{
TAOS_ROW
row
=
taos_fetch_row
(
msg
);
if
(
row
==
NULL
)
break
;
...
...
@@ -129,8 +137,8 @@ int32_t create_topic() {
}
taos_free_result
(
pRes
);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as database abc1");*/
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1, c2, c3 from st1"
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column with meta as database abc1"
);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -191,7 +199,7 @@ tmq_t* build_consumer() {
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
tmq_conf_set
(
conf
,
"experiment.use.snapshot"
,
"
tru
e"
);
tmq_conf_set
(
conf
,
"experiment.use.snapshot"
,
"
fals
e"
);
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
...
...
include/client/taos.h
浏览文件 @
02c6af78
...
...
@@ -261,7 +261,7 @@ enum tmq_res_t {
typedef
enum
tmq_res_t
tmq_res_t
;
DLL_EXPORT
tmq_res_t
tmq_get_res_type
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
tmq_get_raw_meta
(
TAOS_RES
*
res
,
const
void
**
raw_meta
,
int32_t
*
raw_meta_len
);
DLL_EXPORT
int32_t
tmq_get_raw_meta
(
TAOS_RES
*
res
,
void
**
raw_meta
,
int32_t
*
raw_meta_len
);
DLL_EXPORT
const
char
*
tmq_get_topic_name
(
TAOS_RES
*
res
);
DLL_EXPORT
const
char
*
tmq_get_db_name
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
tmq_get_vgroup_id
(
TAOS_RES
*
res
);
...
...
source/client/src/clientMain.c
浏览文件 @
02c6af78
...
...
@@ -290,6 +290,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
pResultInfo
->
current
+=
1
;
return
pResultInfo
->
row
;
}
}
else
if
(
TD_RES_TMQ_META
(
res
))
{
return
NULL
;
}
else
{
// assert to avoid un-initialization error
ASSERT
(
0
);
...
...
source/client/src/tmq.c
浏览文件 @
02c6af78
...
...
@@ -1160,8 +1160,6 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
// handle meta rsp
int8_t
rspType
=
((
SMqRspHead
*
)
pMsg
->
pData
)
->
mqMsgType
;
if
(
rspType
==
TMQ_MSG_TYPE__POLL_META_RSP
)
{
}
SMqPollRspWrapper
*
pRspWrapper
=
taosAllocateQitem
(
sizeof
(
SMqPollRspWrapper
),
DEF_QITEM
);
if
(
pRspWrapper
==
NULL
)
{
...
...
@@ -1174,19 +1172,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
pRspWrapper
->
vgHandle
=
pVg
;
pRspWrapper
->
topicHandle
=
pTopic
;
memcpy
(
&
pRspWrapper
->
dataRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
if
(
rspType
==
TMQ_MSG_TYPE__POLL_RSP
)
{
memcpy
(
&
pRspWrapper
->
dataRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
tDecodeSMqDataBlkRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRspWrapper
->
dataRsp
);
}
else
{
ASSERT
(
rspType
==
TMQ_MSG_TYPE__POLL_META_RSP
);
memcpy
(
&
pRspWrapper
->
metaRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
tDecodeSMqMetaRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRspWrapper
->
metaRsp
);
}
taosMemoryFree
(
pMsg
->
pData
);
tscDebug
(
"consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld"
,
tmq
->
consumerId
,
pVg
->
vgId
,
pRspWrapper
->
dataRsp
.
reqOffset
,
pRspWrapper
->
dataRsp
.
rspOffset
);
tscDebug
(
"consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld
, type %d
"
,
tmq
->
consumerId
,
pVg
->
vgId
,
pRspWrapper
->
dataRsp
.
reqOffset
,
pRspWrapper
->
dataRsp
.
rspOffset
,
rspType
);
taosWriteQitem
(
tmq
->
mqueue
,
pRspWrapper
);
tsem_post
(
&
tmq
->
rspSem
);
...
...
@@ -1558,7 +1556,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
SMqMetaRspObj
*
tmqBuildMetaRspFromWrapper
(
SMqPollRspWrapper
*
pWrapper
)
{
SMqMetaRspObj
*
pRspObj
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqMetaRspObj
));
pRspObj
->
resType
=
RES_TYPE__TMQ
;
pRspObj
->
resType
=
RES_TYPE__TMQ
_META
;
tstrncpy
(
pRspObj
->
topic
,
pWrapper
->
topicHandle
->
topicName
,
TSDB_TOPIC_FNAME_LEN
);
tstrncpy
(
pRspObj
->
db
,
pWrapper
->
topicHandle
->
db
,
TSDB_DB_FNAME_LEN
);
pRspObj
->
vgId
=
pWrapper
->
vgHandle
->
vgId
;
...
...
@@ -1676,7 +1674,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
return
0
;
}
SMqRspObj
*
tmqHandleAllRsp
(
tmq_t
*
tmq
,
int64_t
timeout
,
bool
pollIfReset
)
{
void
*
tmqHandleAllRsp
(
tmq_t
*
tmq
,
int64_t
timeout
,
bool
pollIfReset
)
{
while
(
1
)
{
SMqRspWrapper
*
rspWrapper
=
NULL
;
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
rspWrapper
);
...
...
@@ -1716,18 +1714,18 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
else
if
(
rspWrapper
->
tmqRspType
==
TMQ_MSG_TYPE__POLL_META_RSP
)
{
SMqPollRspWrapper
*
pollRspWrapper
=
(
SMqPollRspWrapper
*
)
rspWrapper
;
int32_t
consumerEpoch
=
atomic_load_32
(
&
tmq
->
epoch
);
if
(
pollRspWrapper
->
da
taRsp
.
head
.
epoch
==
consumerEpoch
)
{
if
(
pollRspWrapper
->
me
taRsp
.
head
.
epoch
==
consumerEpoch
)
{
SMqClientVg
*
pVg
=
pollRspWrapper
->
vgHandle
;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg
->
currentOffset
=
pollRspWrapper
->
da
taRsp
.
rspOffset
;
pVg
->
currentOffset
=
pollRspWrapper
->
me
taRsp
.
rspOffset
;
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
// build rsp
SMq
RspObj
*
pRsp
=
tmqBuild
RspFromWrapper
(
pollRspWrapper
);
SMq
MetaRspObj
*
pRsp
=
tmqBuildMeta
RspFromWrapper
(
pollRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
return
pRsp
;
}
else
{
tscDebug
(
"msg discard since epoch mismatch: msg epoch %d, consumer epoch %d
\n
"
,
pollRspWrapper
->
da
taRsp
.
head
.
epoch
,
consumerEpoch
);
pollRspWrapper
->
me
taRsp
.
head
.
epoch
,
consumerEpoch
);
taosFreeQitem
(
pollRspWrapper
);
}
}
else
{
...
...
@@ -1744,8 +1742,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
timeout
)
{
SMqRspObj
*
rspObj
;
int64_t
startTime
=
taosGetTimestampMs
();
void
*
rspObj
;
int64_t
startTime
=
taosGetTimestampMs
();
#if 0
tmqHandleAllDelayedTask(tmq);
...
...
@@ -1873,7 +1871,7 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return
NULL
;
}
int32_t
tmq_get_raw_meta
(
TAOS_RES
*
res
,
const
void
**
raw_meta
,
int32_t
*
raw_meta_len
)
{
int32_t
tmq_get_raw_meta
(
TAOS_RES
*
res
,
void
**
raw_meta
,
int32_t
*
raw_meta_len
)
{
if
(
TD_RES_TMQ_META
(
res
))
{
SMqMetaRspObj
*
pMetaRspObj
=
(
SMqMetaRspObj
*
)
res
;
*
raw_meta
=
pMetaRspObj
->
metaRsp
.
metaRsp
;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
02c6af78
...
...
@@ -252,15 +252,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
#if 1
if
(
pReq
->
useSnapshot
)
{
tqInfo
(
"retrieve using snapshot"
);
// TODO set ver into snapshot
int64_t
lastVer
=
walGetCommittedVer
(
pTq
->
pWal
);
if
(
rsp
.
reqOffset
<
lastVer
)
{
tqInfo
(
"retrieve using snapshot req offset %ld last ver %ld"
,
rsp
.
reqOffset
,
lastVer
);
tqScanSnapshot
(
pTq
,
&
pHandle
->
execHandle
,
&
rsp
,
workerId
);
if
(
rsp
.
blockNum
!=
0
)
{
rsp
.
withTbName
=
false
;
rsp
.
rspOffset
=
lastVer
;
tqInfo
(
"direct send by snapshot rsp offset %ld"
,
lastVer
);
tqInfo
(
"direct send by snapshot req offset %ld rsp offset %ld"
,
rsp
.
reqOffset
,
rsp
.
rspOffset
);
fetchOffset
=
lastVer
;
goto
SEND_RSP
;
}
}
...
...
@@ -304,7 +306,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pHead
->
msgType
==
TDMT_VND_DROP_STB
||
pHead
->
msgType
==
TDMT_VND_CREATE_TABLE
||
pHead
->
msgType
==
TDMT_VND_ALTER_TABLE
||
pHead
->
msgType
==
TDMT_VND_DROP_TABLE
||
pHead
->
msgType
==
TDMT_VND_DROP_TTL_TABLE
);
// return
tqInfo
(
"fetch meta msg, ver: %ld, type: %d"
,
pHead
->
version
,
pHead
->
msgType
);
SMqMetaRsp
metaRsp
=
{
0
};
metaRsp
.
reqOffset
=
pReq
->
currentOffset
;
metaRsp
.
rspOffset
=
fetchOffset
;
...
...
@@ -387,6 +389,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle
->
epoch
=
-
1
;
pHandle
->
execHandle
.
subType
=
req
.
subType
;
pHandle
->
fetchMeta
=
req
.
withMeta
;
pHandle
->
pWalReader
=
walOpenReadHandle
(
pTq
->
pVnode
->
pWal
);
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
02c6af78
...
...
@@ -340,7 +340,7 @@ typedef struct SStreamBlockScanInfo {
SReadHandle
readHandle
;
uint64_t
tableUid
;
// queried super table uid
EStreamScanMode
scanMode
;
SOperatorInfo
*
p
OperatorDumy
;
SOperatorInfo
*
p
SnapshotReadOp
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here.
SArray
*
childIds
;
SessionWindowSupporter
sessionSup
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
02c6af78
...
...
@@ -337,7 +337,8 @@ void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_
}
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
colDataAppend
(
pColInfoData
,
i
,
data
,
(
data
==
NULL
)
||
(
pColInfoData
->
info
.
type
==
TSDB_DATA_TYPE_JSON
&&
tTagIsJsonNull
(
data
)));
colDataAppend
(
pColInfoData
,
i
,
data
,
(
data
==
NULL
)
||
(
pColInfoData
->
info
.
type
==
TSDB_DATA_TYPE_JSON
&&
tTagIsJsonNull
(
data
)));
}
if
(
data
&&
(
pColInfoData
->
info
.
type
!=
TSDB_DATA_TYPE_JSON
)
&&
p
!=
NULL
&&
...
...
@@ -776,7 +777,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
if
(
!
needRead
)
{
return
false
;
}
STableScanInfo
*
pTableScanInfo
=
pInfo
->
p
OperatorDumy
->
info
;
STableScanInfo
*
pTableScanInfo
=
pInfo
->
p
SnapshotReadOp
->
info
;
pTableScanInfo
->
cond
.
twindows
[
0
]
=
win
;
pTableScanInfo
->
curTWinIdx
=
0
;
tsdbResetReadHandle
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
,
0
);
...
...
@@ -821,11 +822,11 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_
static
SSDataBlock
*
doDataScan
(
SStreamBlockScanInfo
*
pInfo
)
{
while
(
1
)
{
SSDataBlock
*
pResult
=
NULL
;
pResult
=
doTableScan
(
pInfo
->
p
OperatorDumy
);
pResult
=
doTableScan
(
pInfo
->
p
SnapshotReadOp
);
if
(
pResult
==
NULL
)
{
if
(
prepareDataScan
(
pInfo
))
{
// scan next window data
pResult
=
doTableScan
(
pInfo
->
p
OperatorDumy
);
pResult
=
doTableScan
(
pInfo
->
p
SnapshotReadOp
);
}
}
if
(
!
pResult
)
{
...
...
@@ -860,11 +861,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
ASSERT
(
pBlock
->
info
.
numOfCols
==
pUpdateBlock
->
info
.
numOfCols
);
int32_t
rowId
=
*
(
int32_t
*
)
taosArrayGet
(
pInfo
->
tsArray
,
pInfo
->
tsArrayIndex
);
pInfo
->
groupId
=
getGroupId
(
pInfo
->
p
OperatorDumy
,
pBlock
,
rowId
);
pInfo
->
groupId
=
getGroupId
(
pInfo
->
p
SnapshotReadOp
,
pBlock
,
rowId
);
int32_t
i
=
0
;
for
(;
i
<
size
;
i
++
)
{
rowId
=
*
(
int32_t
*
)
taosArrayGet
(
pInfo
->
tsArray
,
i
+
pInfo
->
tsArrayIndex
);
uint64_t
id
=
getGroupId
(
pInfo
->
p
OperatorDumy
,
pBlock
,
rowId
);
uint64_t
id
=
getGroupId
(
pInfo
->
p
SnapshotReadOp
,
pBlock
,
rowId
);
if
(
pInfo
->
groupId
!=
id
)
{
break
;
}
...
...
@@ -1063,7 +1064,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
return
(
pBlockInfo
->
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
else
if
(
pInfo
->
blockType
==
STREAM_DATA_TYPE_FROM_SNAPSHOT
)
{
SSDataBlock
*
pResult
=
doTableScan
(
pInfo
->
p
OperatorDumy
);
SSDataBlock
*
pResult
=
doTableScan
(
pInfo
->
p
SnapshotReadOp
);
if
(
pResult
)
{
return
pResult
->
info
.
rows
>
0
?
pResult
:
NULL
;
}
...
...
@@ -1135,7 +1136,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
}
else
{
pInfo
->
pUpdateInfo
=
NULL
;
}
pInfo
->
p
OperatorDumy
=
pTableScanDummy
;
pInfo
->
p
SnapshotReadOp
=
pTableScanDummy
;
pInfo
->
interval
=
pSTInfo
->
interval
;
pInfo
->
readHandle
=
*
pHandle
;
...
...
@@ -1557,7 +1558,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
return
NULL
;
}
int32_t
msgType
=
(
strcasecmp
(
name
,
TSDB_INS_TABLE_DNODE_VARIABLES
)
==
0
)
?
TDMT_DND_SYSTABLE_RETRIEVE
:
TDMT_MND_SYSTABLE_RETRIEVE
;
int32_t
msgType
=
(
strcasecmp
(
name
,
TSDB_INS_TABLE_DNODE_VARIABLES
)
==
0
)
?
TDMT_DND_SYSTABLE_RETRIEVE
:
TDMT_MND_SYSTABLE_RETRIEVE
;
pMsgSendInfo
->
param
=
pOperator
;
pMsgSendInfo
->
msgInfo
.
pData
=
buf1
;
...
...
@@ -1843,7 +1845,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
}
else
{
data
=
(
char
*
)
p
;
}
colDataAppend
(
pDst
,
count
,
data
,
(
data
==
NULL
)
||
(
pDst
->
info
.
type
==
TSDB_DATA_TYPE_JSON
&&
tTagIsJsonNull
(
data
)));
colDataAppend
(
pDst
,
count
,
data
,
(
data
==
NULL
)
||
(
pDst
->
info
.
type
==
TSDB_DATA_TYPE_JSON
&&
tTagIsJsonNull
(
data
)));
if
(
pDst
->
info
.
type
!=
TSDB_DATA_TYPE_JSON
&&
p
!=
NULL
&&
IS_VAR_DATA_TYPE
(((
const
STagVal
*
)
p
)
->
type
)
&&
data
!=
NULL
)
{
...
...
@@ -1896,11 +1899,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
goto
_error
;
}
pInfo
->
pTableList
=
pTableListInfo
;
pInfo
->
pColMatchInfo
=
colList
;
pInfo
->
pRes
=
createResDataBlock
(
pDescNode
);
pInfo
->
readHandle
=
*
pReadHandle
;
pInfo
->
curPos
=
0
;
pInfo
->
pTableList
=
pTableListInfo
;
pInfo
->
pColMatchInfo
=
colList
;
pInfo
->
pRes
=
createResDataBlock
(
pDescNode
);
pInfo
->
readHandle
=
*
pReadHandle
;
pInfo
->
curPos
=
0
;
pOperator
->
name
=
"TagScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
02c6af78
...
...
@@ -2238,7 +2238,8 @@ static void clearUpdateDataBlock(SSDataBlock* pBlock) {
}
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
)
{
ASSERT
(
pDest
->
info
.
capacity
>=
pSource
->
info
.
rows
);
// ASSERT(pDest->info.capacity >= pSource->info.rows);
blockDataEnsureCapacity
(
pDest
,
pSource
->
info
.
rows
);
clearUpdateDataBlock
(
pDest
);
SColumnInfoData
*
pDestCol
=
taosArrayGet
(
pDest
->
pDataBlock
,
0
);
SColumnInfoData
*
pSourceCol
=
taosArrayGet
(
pSource
->
pDataBlock
,
tsColIndex
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录