Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
12319320
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
12319320
编写于
4月 22, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/vnode_refact1
上级
bb63a149
9c4bb8b0
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
124 addition
and
59 deletion
+124
-59
example/src/tmq.c
example/src/tmq.c
+1
-0
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+0
-1
source/dnode/mnode/impl/src/mndPerfSchema.c
source/dnode/mnode/impl/src/mndPerfSchema.c
+2
-2
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+3
-2
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+18
-34
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+0
-1
source/libs/qworker/inc/qworkerMsg.h
source/libs/qworker/inc/qworkerMsg.h
+2
-1
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+40
-5
source/libs/qworker/src/qworkerMsg.c
source/libs/qworker/src/qworkerMsg.c
+42
-2
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+16
-11
未找到文件。
example/src/tmq.c
浏览文件 @
12319320
...
...
@@ -103,6 +103,7 @@ int32_t create_topic() {
/*const char* sql = "select * from tu1";*/
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
...
...
include/libs/catalog/catalog.h
浏览文件 @
12319320
...
...
@@ -213,7 +213,6 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const
*/
int32_t
catalogGetAllMeta
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
);
int32_t
catalogGetQnodeList
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
SArray
*
pQnodeList
);
int32_t
catalogGetExpiredSTables
(
SCatalog
*
pCatalog
,
SSTableMetaVersion
**
stables
,
uint32_t
*
num
);
...
...
source/dnode/mnode/impl/src/mndPerfSchema.c
浏览文件 @
12319320
...
...
@@ -41,10 +41,10 @@ static const SPerfsTableSchema queriesSchema[] = {
static
const
SPerfsTableSchema
topicSchema
[]
=
{
{.
name
=
"topic_name"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"db_name"
,
.
bytes
=
SYSTABLE_SCH_DB_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
/*{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},*/
{.
name
=
"create_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"sql"
,
.
bytes
=
TSDB_SHOW_SQL_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"row_len"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
/*{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},*/
};
static
const
SPerfsTableSchema
consumerSchema
[]
=
{
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
12319320
...
...
@@ -59,7 +59,7 @@ int32_t mndInitStream(SMnode *pMnode) {
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveStream);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_TOPICS
,
mndCancelGetNextStream
);
/*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream);*/
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
...
...
@@ -247,7 +247,8 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
return
code
;
}
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
int8_t
triggerType
,
int64_t
watermark
,
STrans
*
pTrans
)
{
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
int8_t
triggerType
,
int64_t
watermark
,
STrans
*
pTrans
)
{
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
ast
,
&
pAst
)
<
0
)
{
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
12319320
...
...
@@ -35,7 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj
static
int32_t
mndProcessCreateTopicReq
(
SNodeMsg
*
pReq
);
static
int32_t
mndProcessDropTopicReq
(
SNodeMsg
*
pReq
);
static
int32_t
mndProcessDropTopicInRsp
(
SNodeMsg
*
pRsp
);
static
int32_t
mndRetrieveTopic
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
int32_t
mndRetrieveTopic
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextTopic
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitTopic
(
SMnode
*
pMnode
)
{
...
...
@@ -51,7 +51,7 @@ int32_t mndInitTopic(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_TOPIC
,
mndProcessDropTopicReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_TOPIC_RSP
,
mndProcessDropTopicInRsp
);
//
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_TOPICS
,
mndRetrieveTopic
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_TOPICS
,
mndCancelGetNextTopic
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
...
...
@@ -511,56 +511,40 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
return
0
;
}
static
int32_t
mndRetrieveTopic
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
static
int32_t
mndRetrieveTopic
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rowsCapacity
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
SMqTopicObj
*
pTopic
=
NULL
;
int32_t
cols
=
0
;
char
*
pWrite
;
char
prefix
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
pShow
->
db
);
if
(
pDb
==
NULL
)
return
0
;
tstrncpy
(
prefix
,
pShow
->
db
,
TSDB_DB_FNAME_LEN
);
strcat
(
prefix
,
TS_PATH_DELIMITER
);
int32_t
prefixLen
=
(
int32_t
)
strlen
(
prefix
);
while
(
numOfRows
<
rows
)
{
while
(
numOfRows
<
rowsCapacity
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_TOPIC
,
pShow
->
pIter
,
(
void
**
)
&
pTopic
);
if
(
pShow
->
pIter
==
NULL
)
break
;
if
(
pTopic
->
dbUid
!=
pDb
->
uid
)
{
if
(
strncmp
(
pTopic
->
name
,
prefix
,
prefixLen
)
!=
0
)
{
mError
(
"Inconsistent topic data, name:%s, db:%s, dbUid:%"
PRIu64
,
pTopic
->
name
,
pDb
->
name
,
pDb
->
uid
);
}
int32_t
cols
=
0
;
sdbRelease
(
pSdb
,
pTopic
)
;
continue
;
}
char
topicName
[
TSDB_TOPIC_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
}
;
tstrncpy
(
&
topicName
[
VARSTR_HEADER_SIZE
],
pTopic
->
name
,
TSDB_TOPIC_NAME_LEN
)
;
varDataSetLen
(
topicName
,
strlen
(
&
topicName
[
VARSTR_HEADER_SIZE
]));
cols
=
0
;
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
topicName
,
false
);
char
topicName
[
TSDB_TOPIC_NAME_LEN
]
=
{
0
};
tstrncpy
(
topicName
,
pTopic
->
name
+
prefixLen
,
TSDB_TOPIC_NAME_LEN
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_TO_VARSTR
(
pWrite
,
topicName
);
cols
++
;
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pTopic
->
createTime
,
false
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
pTopic
->
createTime
;
cols
++
;
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
char
*
sql
=
taosMemoryCalloc
(
1
,
strlen
(
pTopic
->
sql
)
+
1
+
VARSTR_HEADER_SIZE
);
strcpy
(
&
sql
[
VARSTR_HEADER_SIZE
],
pTopic
->
sql
);
varDataSetLen
(
sql
,
strlen
(
&
sql
[
VARSTR_HEADER_SIZE
]));
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
sql
,
false
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
pTopic
->
sql
,
pShow
->
bytes
[
cols
]);
cols
++
;
taosMemoryFree
(
sql
);
numOfRows
++
;
sdbRelease
(
pSdb
,
pTopic
);
}
mndReleaseDb
(
pMnode
,
pDb
);
pShow
->
numOfRows
+=
numOfRows
;
return
numOfRows
;
}
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
12319320
...
...
@@ -1326,7 +1326,6 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32
STSRow
*
row
=
(
STSRow
*
)(
pDataBlock
->
pData
+
pDataBlock
->
size
);
// skip the SSubmitBlk header
tdSRowResetBuf
(
pBuilder
,
row
);
// 1. set the parsed value from sql string
for
(
int
c
=
0
;
c
<
spd
->
numOfBound
;
++
c
)
{
SSchema
*
pColSchema
=
&
pSchema
[
spd
->
boundColumns
[
c
]
-
1
];
...
...
source/libs/qworker/inc/qworkerMsg.h
浏览文件 @
12319320
...
...
@@ -43,7 +43,8 @@ void qwFreeFetchRsp(void *msg);
int32_t
qwMallocFetchRsp
(
int32_t
length
,
SRetrieveTableRsp
**
rsp
);
int32_t
qwGetSchTasksStatus
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SSchedulerStatusRsp
**
rsp
);
int32_t
qwBuildAndSendHbRsp
(
SQWConnInfo
*
pConn
,
SSchedulerHbRsp
*
rsp
,
int32_t
code
);
int32_t
qwRegisterBrokenLinkArg
(
QW_FPARAMS_DEF
,
SQWConnInfo
*
pConn
);
int32_t
qwRegisterQueryBrokenLinkArg
(
QW_FPARAMS_DEF
,
SQWConnInfo
*
pConn
);
int32_t
qwRegisterHbBrokenLinkArg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SQWConnInfo
*
pConn
);
#ifdef __cplusplus
}
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
12319320
...
...
@@ -948,7 +948,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
DataSinkHandle
sinkHandle
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
QW_ERR_JRET
(
qwRegisterBrokenLinkArg
(
QW_FPARAMS
(),
&
qwMsg
->
connInfo
));
QW_ERR_JRET
(
qwRegister
Query
BrokenLinkArg
(
QW_FPARAMS
(),
&
qwMsg
->
connInfo
));
QW_ERR_JRET
(
qwHandlePrePhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_PRE_QUERY
,
&
input
,
NULL
));
...
...
@@ -1285,23 +1285,51 @@ _return:
QW_RET
(
TSDB_CODE_SUCCESS
);
}
int32_t
qwProcessHbLinkBroken
(
SQWorkerMgmt
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
code
=
0
;
SSchedulerHbRsp
rsp
=
{
0
};
SQWSchStatus
*
sch
=
NULL
;
QW_ERR_RET
(
qwAcquireAddScheduler
(
mgmt
,
req
->
sId
,
QW_READ
,
&
sch
));
QW_LOCK
(
QW_WRITE
,
&
sch
->
hbConnLock
);
if
(
qwMsg
->
connInfo
.
handle
==
sch
->
hbConnInfo
.
handle
)
{
tmsgReleaseHandle
(
sch
->
hbConnInfo
.
handle
,
TAOS_CONN_SERVER
);
sch
->
hbConnInfo
.
handle
=
NULL
;
sch
->
hbConnInfo
.
ahandle
=
NULL
;
QW_DLOG
(
"release hb handle due to connection broken, handle:%p"
,
qwMsg
->
connInfo
.
handle
);
}
else
{
QW_DLOG
(
"ignore hb connection broken, handle:%p, currentHandle:%p"
,
qwMsg
->
connInfo
.
handle
,
sch
->
hbConnInfo
.
handle
);
}
QW_UNLOCK
(
QW_WRITE
,
&
sch
->
hbConnLock
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
QW_RET
(
TSDB_CODE_SUCCESS
);
}
int32_t
qwProcessHb
(
SQWorkerMgmt
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
code
=
0
;
SSchedulerHbRsp
rsp
=
{
0
};
SQWSchStatus
*
sch
=
NULL
;
uint64_t
seqId
=
0
;
void
*
origHandle
=
NULL
;
memcpy
(
&
rsp
.
epId
,
&
req
->
epId
,
sizeof
(
req
->
epId
));
if
(
qwMsg
->
code
)
{
QW_RET
(
qwProcessHbLinkBroken
(
mgmt
,
qwMsg
,
req
));
}
QW_ERR_JRET
(
qwAcquireAddScheduler
(
mgmt
,
req
->
sId
,
QW_READ
,
&
sch
));
QW_ERR_JRET
(
qwRegisterHbBrokenLinkArg
(
mgmt
,
req
->
sId
,
&
qwMsg
->
connInfo
));
QW_LOCK
(
QW_WRITE
,
&
sch
->
hbConnLock
);
if
(
sch
->
hbConnInfo
.
handle
)
{
tmsgReleaseHandle
(
sch
->
hbConnInfo
.
handle
,
TAOS_CONN_SERVER
);
}
memcpy
(
&
sch
->
hbConnInfo
,
&
qwMsg
->
connInfo
,
sizeof
(
qwMsg
->
connInfo
));
memcpy
(
&
sch
->
hbEpId
,
&
req
->
epId
,
sizeof
(
req
->
epId
));
...
...
@@ -1314,7 +1342,14 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
_return:
memcpy
(
&
rsp
.
epId
,
&
req
->
epId
,
sizeof
(
req
->
epId
));
qwBuildAndSendHbRsp
(
&
qwMsg
->
connInfo
,
&
rsp
,
code
);
if
(
code
)
{
tmsgReleaseHandle
(
qwMsg
->
connInfo
.
handle
,
TAOS_CONN_SERVER
);
}
QW_DLOG
(
"hb rsp send, handle:%p, code:%x - %s"
,
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
));
QW_RET
(
TSDB_CODE_SUCCESS
);
...
...
source/libs/qworker/src/qworkerMsg.c
浏览文件 @
12319320
...
...
@@ -286,7 +286,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
}
int32_t
qwRegisterBrokenLinkArg
(
QW_FPARAMS_DEF
,
SQWConnInfo
*
pConn
)
{
int32_t
qwRegister
Query
BrokenLinkArg
(
QW_FPARAMS_DEF
,
SQWConnInfo
*
pConn
)
{
STaskDropReq
*
req
=
(
STaskDropReq
*
)
rpcMallocCont
(
sizeof
(
STaskDropReq
));
if
(
NULL
==
req
)
{
QW_SCH_TASK_ELOG
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
STaskDropReq
));
...
...
@@ -313,6 +313,42 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwRegisterHbBrokenLinkArg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SQWConnInfo
*
pConn
)
{
SSchedulerHbReq
req
=
{
0
};
req
.
header
.
vgId
=
mgmt
->
nodeId
;
req
.
sId
=
sId
;
int32_t
msgSize
=
tSerializeSSchedulerHbReq
(
NULL
,
0
,
&
req
);
if
(
msgSize
<
0
)
{
QW_SCH_ELOG
(
"tSerializeSSchedulerHbReq hbReq failed, size:%d"
,
msgSize
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
void
*
msg
=
rpcMallocCont
(
msgSize
);
if
(
NULL
==
msg
)
{
QW_SCH_ELOG
(
"calloc %d failed"
,
msgSize
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
tSerializeSSchedulerHbReq
(
msg
,
msgSize
,
&
req
)
<
0
)
{
QW_SCH_ELOG
(
"tSerializeSSchedulerHbReq hbReq failed, size:%d"
,
msgSize
);
taosMemoryFree
(
msg
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SRpcMsg
pMsg
=
{
.
handle
=
pConn
->
handle
,
.
ahandle
=
pConn
->
ahandle
,
.
msgType
=
TDMT_VND_QUERY_HEARTBEAT
,
.
pCont
=
msg
,
.
contLen
=
sizeof
(
SSchedulerHbReq
),
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
,
};
tmsgRegisterBrokenLinkArg
(
&
mgmt
->
msgCb
,
&
pMsg
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
...
...
@@ -587,10 +623,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
uint64_t
sId
=
req
.
sId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
};
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
code
=
pMsg
->
code
};
qwMsg
.
connInfo
.
handle
=
pMsg
->
handle
;
qwMsg
.
connInfo
.
ahandle
=
pMsg
->
ahandle
;
if
(
TSDB_CODE_RPC_NETWORK_UNAVAIL
==
pMsg
->
code
)
{
QW_SCH_DLOG
(
"receive Hb msg due to network broken, error:%s"
,
tstrerror
(
pMsg
->
code
));
}
QW_SCH_DLOG
(
"processHb start, node:%p, handle:%p"
,
node
,
pMsg
->
handle
);
QW_ERR_RET
(
qwProcessHb
(
mgmt
,
&
qwMsg
,
&
req
));
...
...
source/libs/scalar/src/sclfunc.c
浏览文件 @
12319320
...
...
@@ -1123,20 +1123,20 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
GET_TYPED_DATA
(
timeUnit
,
int64_t
,
GET_PARAM_TYPE
(
&
pInput
[
2
]),
pInput
[
2
].
columnData
->
pData
);
}
char
*
input
[
2
];
for
(
int32_t
k
=
0
;
k
<
2
;
++
k
)
{
int32_t
type
=
GET_PARAM_TYPE
(
&
pInput
[
k
]);
if
(
type
!=
TSDB_DATA_TYPE_BIGINT
&&
type
!=
TSDB_DATA_TYPE_TIMESTAMP
&&
type
!=
TSDB_DATA_TYPE_BINARY
&&
type
!=
TSDB_DATA_TYPE_NCHAR
)
{
return
TSDB_CODE_FAILED
;
int32_t
numOfRows
=
0
;
for
(
int32_t
i
=
0
;
i
<
inputNum
;
++
i
)
{
if
(
pInput
[
i
].
numOfRows
>
numOfRows
)
{
numOfRows
=
pInput
[
i
].
numOfRows
;
}
}
for
(
int32_t
i
=
0
;
i
<
pInput
[
0
].
numOfRows
;
++
i
)
{
char
*
input
[
2
];
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
bool
hasNull
=
false
;
for
(
int32_t
k
=
0
;
k
<
2
;
++
k
)
{
if
(
colDataIsNull_s
(
pInput
[
0
].
columnData
,
i
))
{
colDataAppendNULL
(
pOutput
->
columnData
,
i
)
;
continue
;
if
(
colDataIsNull_s
(
pInput
[
k
].
columnData
,
i
))
{
hasNull
=
true
;
break
;
}
int32_t
rowIdx
=
(
pInput
[
k
].
numOfRows
==
1
)
?
0
:
i
;
...
...
@@ -1178,6 +1178,11 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
}
}
if
(
hasNull
)
{
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
int64_t
result
=
(
timeVal
[
0
]
>=
timeVal
[
1
])
?
(
timeVal
[
0
]
-
timeVal
[
1
])
:
(
timeVal
[
1
]
-
timeVal
[
0
]);
...
...
@@ -1238,7 +1243,7 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
colDataAppend
(
pOutput
->
columnData
,
i
,
(
char
*
)
&
result
,
false
);
}
pOutput
->
numOfRows
=
pInput
->
numOfRows
;
pOutput
->
numOfRows
=
numOfRows
;
return
TSDB_CODE_SUCCESS
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录