Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f7cc44d6
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f7cc44d6
编写于
5月 20, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into fix/hzcheng_3.0
上级
9d0116ea
9ca54b94
变更
29
展开全部
隐藏空白更改
内联
并排
Showing
29 changed file
with
1620 addition
and
381 deletion
+1620
-381
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+7
-0
include/libs/transport/trpc.h
include/libs/transport/trpc.h
+1
-13
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+2
-7
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+6
-12
source/dnode/mgmt/node_mgmt/src/dmTransport.c
source/dnode/mgmt/node_mgmt/src/dmTransport.c
+4
-10
source/dnode/mgmt/test/sut/src/client.cpp
source/dnode/mgmt/test/sut/src/client.cpp
+3
-3
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+57
-7
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+0
-10
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+5
-0
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+33
-8
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+123
-45
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+109
-123
source/libs/index/src/indexFst.c
source/libs/index/src/indexFst.c
+13
-13
source/libs/index/src/indexTfile.c
source/libs/index/src/indexTfile.c
+3
-3
source/libs/index/src/indexUtil.c
source/libs/index/src/indexUtil.c
+10
-7
source/libs/index/test/fstTest.cc
source/libs/index/test/fstTest.cc
+2
-2
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+85
-47
source/libs/sync/src/syncIO.c
source/libs/sync/src/syncIO.c
+0
-4
source/libs/transport/inc/transportInt.h
source/libs/transport/inc/transportInt.h
+2
-10
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+0
-3
source/libs/transport/test/pushServer.c
source/libs/transport/test/pushServer.c
+0
-1
source/libs/transport/test/rclient.c
source/libs/transport/test/rclient.c
+0
-7
source/libs/transport/test/rserver.c
source/libs/transport/test/rserver.c
+0
-1
source/libs/transport/test/syncClient.c
source/libs/transport/test/syncClient.c
+9
-16
source/libs/transport/test/transUT.cpp
source/libs/transport/test/transUT.cpp
+0
-6
tests/system-test/7-tmq/subscribeStb.py
tests/system-test/7-tmq/subscribeStb.py
+1112
-0
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+1
-0
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+33
-20
tools/shell/src/shellNettest.c
tools/shell/src/shellNettest.c
+0
-3
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
f7cc44d6
...
...
@@ -107,6 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit)
if
(
ref
==
0
)
{
taosMemoryFree
(
pDataSubmit
->
data
);
taosMemoryFree
(
pDataSubmit
->
dataRef
);
taosFreeQitem
(
pDataSubmit
);
}
}
...
...
@@ -279,6 +280,12 @@ typedef struct {
SArray
*
res
;
// SArray<SSDataBlock>
}
SStreamSinkReq
;
typedef
struct
{
SMsgHead
head
;
int64_t
streamId
;
int32_t
taskId
;
}
SStreamTaskRunReq
;
int32_t
streamEnqueueDataSubmit
(
SStreamTask
*
pTask
,
SStreamDataSubmit
*
input
);
int32_t
streamEnqueueDataBlk
(
SStreamTask
*
pTask
,
SStreamDataBlock
*
input
);
int32_t
streamDequeueOutput
(
SStreamTask
*
pTask
,
void
**
output
);
...
...
include/libs/transport/trpc.h
浏览文件 @
f7cc44d6
...
...
@@ -63,11 +63,6 @@ typedef struct SRpcMsg {
}
SRpcMsg
;
typedef
void
(
*
RpcCfp
)(
void
*
parent
,
SRpcMsg
*
,
SEpSet
*
rf
);
typedef
int
(
*
RpcAfp
)(
void
*
parent
,
char
*
tableId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
///
// // SRpcMsg code
// REDIERE,
// NOT READY, EpSet
typedef
bool
(
*
RpcRfp
)(
int32_t
code
);
typedef
struct
SRpcInit
{
...
...
@@ -80,18 +75,11 @@ typedef struct SRpcInit {
int
idleTime
;
// milliseconds, 0 means idle timer is disabled
// the following is for client app ecurity only
char
*
user
;
// user name
char
spi
;
// security parameter index
char
encrypt
;
// encrypt algorithm
char
*
secret
;
// key for authentication
char
*
ckey
;
// ciphering key
char
*
user
;
// user name
// call back to process incoming msg, code shall be ignored by server app
RpcCfp
cfp
;
// call back to retrieve the client auth info, for server app only
RpcAfp
afp
;
// user defined retry func
RpcRfp
rfp
;
...
...
source/client/src/clientEnv.c
浏览文件 @
f7cc44d6
...
...
@@ -60,7 +60,7 @@ static void registerRequest(SRequestObj *pRequest) {
static
void
deregisterRequest
(
SRequestObj
*
pRequest
)
{
assert
(
pRequest
!=
NULL
);
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SInstanceSummary
*
pActivity
=
&
pTscObj
->
pAppInfo
->
summary
;
int32_t
currentInst
=
atomic_sub_fetch_64
((
int64_t
*
)
&
pActivity
->
currentRequests
,
1
);
...
...
@@ -91,7 +91,6 @@ static bool clientRpcRfp(int32_t code) {
}
}
// TODO refactor
void
*
openTransporter
(
const
char
*
user
,
const
char
*
auth
,
int32_t
numOfThread
)
{
SRpcInit
rpcInit
;
...
...
@@ -105,10 +104,6 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
user
=
(
char
*
)
user
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
secret
=
(
char
*
)
auth
;
void
*
pDnodeConn
=
rpcOpen
(
&
rpcInit
);
if
(
pDnodeConn
==
NULL
)
{
tscError
(
"failed to init connection to server"
);
...
...
@@ -318,7 +313,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
return
0
;
}
SConfig
*
pCfg
=
taosGetCfg
();
SConfig
*
pCfg
=
taosGetCfg
();
SConfigItem
*
pItem
=
NULL
;
switch
(
option
)
{
...
...
source/client/src/clientImpl.c
浏览文件 @
f7cc44d6
...
...
@@ -291,7 +291,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
SQueryResult
res
=
{.
code
=
0
,
.
numOfRows
=
0
,
.
msgSize
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
msg
=
pRequest
->
msgBuf
};
int32_t
code
=
schedulerExecJob
(
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
queryJob
,
pRequest
->
sqlstr
,
pRequest
->
metric
.
start
,
&
res
);
pRequest
->
metric
.
start
,
&
res
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
...
...
@@ -325,7 +325,7 @@ int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList)
int32_t
validateSversion
(
SRequestObj
*
pRequest
,
void
*
res
)
{
SArray
*
pArray
=
NULL
;
int32_t
code
=
0
;
if
(
TDMT_VND_SUBMIT
==
pRequest
->
type
)
{
SSubmitRsp
*
pRsp
=
(
SSubmitRsp
*
)
res
;
if
(
pRsp
->
nBlocks
<=
0
)
{
...
...
@@ -337,14 +337,13 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
nBlocks
;
++
i
)
{
SSubmitBlkRsp
*
blk
=
pRsp
->
pBlocks
+
i
;
STbSVersion
tbSver
=
{.
tbFName
=
blk
->
tblFName
,
.
sver
=
blk
->
sver
};
SSubmitBlkRsp
*
blk
=
pRsp
->
pBlocks
+
i
;
STbSVersion
tbSver
=
{.
tbFName
=
blk
->
tblFName
,
.
sver
=
blk
->
sver
};
taosArrayPush
(
pArray
,
&
tbSver
);
}
}
else
if
(
TDMT_VND_QUERY
==
pRequest
->
type
)
{
}
SCatalog
*
pCatalog
=
NULL
;
...
...
@@ -365,11 +364,10 @@ void freeRequestRes(SRequestObj* pRequest, void* res) {
if
(
NULL
==
res
)
{
return
;
}
if
(
TDMT_VND_SUBMIT
==
pRequest
->
type
)
{
tFreeSSubmitRsp
((
SSubmitRsp
*
)
res
);
}
else
if
(
TDMT_VND_QUERY
==
pRequest
->
type
)
{
}
}
...
...
@@ -1022,7 +1020,6 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
SRpcInit
rpcInit
=
{
0
};
char
pass
[
TSDB_PASSWORD_LEN
+
1
]
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)(
"_pwd"
),
strlen
(
"_pwd"
),
pass
);
rpcInit
.
label
=
"CHK"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
NULL
;
...
...
@@ -1030,9 +1027,6 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
user
=
"_dnd"
;
rpcInit
.
ckey
=
"_key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
secret
=
pass
;
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
clientRpc
==
NULL
)
{
...
...
source/dnode/mgmt/node_mgmt/src/dmTransport.c
浏览文件 @
f7cc44d6
...
...
@@ -49,9 +49,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
}
static
void
dmProcessRpcMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpc
,
SEpSet
*
pEpSet
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
int32_t
code
=
-
1
;
SRpcMsg
*
pMsg
=
NULL
;
SRpcMsg
*
pMsg
=
NULL
;
bool
needRelease
=
false
;
SDnodeHandle
*
pHandle
=
&
pTrans
->
msgHandles
[
TMSG_INDEX
(
pRpc
->
msgType
)];
SMgmtWrapper
*
pWrapper
=
NULL
;
...
...
@@ -179,11 +179,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
for
(
EDndNodeType
ntype
=
DNODE
;
ntype
<
NODE_END
;
++
ntype
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
ntype
];
SArray
*
pArray
=
(
*
pWrapper
->
func
.
getHandlesFp
)();
SArray
*
pArray
=
(
*
pWrapper
->
func
.
getHandlesFp
)();
if
(
pArray
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
++
i
)
{
SMgmtHandle
*
pMgmt
=
taosArrayGet
(
pArray
,
i
);
SMgmtHandle
*
pMgmt
=
taosArrayGet
(
pArray
,
i
);
SDnodeHandle
*
pHandle
=
&
pTrans
->
msgHandles
[
TMSG_INDEX
(
pMgmt
->
msgType
)];
if
(
pMgmt
->
needCheckVgId
)
{
pHandle
->
needCheckVgId
=
pMgmt
->
needCheckVgId
;
...
...
@@ -276,15 +276,9 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
user
=
INTERNAL_USER
;
rpcInit
.
ckey
=
INTERNAL_CKEY
;
rpcInit
.
spi
=
1
;
rpcInit
.
parent
=
pDnode
;
rpcInit
.
rfp
=
rpcRfp
;
char
pass
[
TSDB_PASSWORD_LEN
+
1
]
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)(
INTERNAL_SECRET
),
strlen
(
INTERNAL_SECRET
),
pass
);
rpcInit
.
secret
=
pass
;
pTrans
->
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
pTrans
->
clientRpc
==
NULL
)
{
dError
(
"failed to init dnode rpc client"
);
...
...
source/dnode/mgmt/test/sut/src/client.cpp
浏览文件 @
f7cc44d6
...
...
@@ -48,10 +48,10 @@ void TestClient::DoInit() {
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
idleTime
=
30
*
1000
;
rpcInit
.
user
=
(
char
*
)
this
->
user
;
rpcInit
.
ckey
=
(
char
*
)
"key"
;
//
rpcInit.ckey = (char*)"key";
rpcInit
.
parent
=
this
;
rpcInit
.
secret
=
(
char
*
)
secretEncrypt
;
rpcInit
.
spi
=
1
;
//
rpcInit.secret = (char*)secretEncrypt;
//
rpcInit.spi = 1;
clientRpc
=
rpcOpen
(
&
rpcInit
);
ASSERT
(
clientRpc
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
f7cc44d6
...
...
@@ -112,19 +112,18 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
if
(
pIter
==
NULL
)
break
;
pExec
=
(
STqExec
*
)
pIter
;
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__DB
)
{
if
(
isAdd
)
{
continue
;
}
else
{
if
(
!
isAdd
)
{
int32_t
sz
=
taosArrayGetSize
(
tbUidList
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int64_t
tbUid
=
*
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
taosHashPut
(
pExec
->
pDropTbUid
,
&
tbUid
,
sizeof
(
int64_t
),
NULL
,
0
);
}
}
}
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
int32_t
code
=
qUpdateQualifiedTableId
(
pExec
->
task
[
i
],
tbUidList
,
isAdd
);
ASSERT
(
code
==
0
);
}
else
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
int32_t
code
=
qUpdateQualifiedTableId
(
pExec
->
task
[
i
],
tbUidList
,
isAdd
);
ASSERT
(
code
==
0
);
}
}
}
return
0
;
...
...
@@ -1059,6 +1058,57 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t wo
return
0
;
}
int32_t
tqProcessStreamTriggerNew
(
STQ
*
pTq
,
SSubmitReq
*
data
)
{
SStreamDataSubmit
*
pSubmit
=
NULL
;
// build data
pSubmit
=
taosAllocateQitem
(
sizeof
(
SStreamDataSubmit
),
DEF_QITEM
);
if
(
pSubmit
==
NULL
)
return
-
1
;
pSubmit
->
dataRef
=
taosMemoryMalloc
(
sizeof
(
int32_t
));
if
(
pSubmit
->
dataRef
==
NULL
)
goto
FAIL
;
*
pSubmit
->
dataRef
=
1
;
pSubmit
->
data
=
data
;
pSubmit
->
type
=
STREAM_INPUT__DATA_BLOCK
;
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pStreamTasks
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SStreamTask
*
pTask
=
(
SStreamTask
*
)
pIter
;
if
(
pTask
->
inputType
==
TASK_INPUT_TYPE__SUMBIT_BLOCK
)
{
streamEnqueueDataSubmit
(
pTask
,
pSubmit
);
// TODO cal back pressure
}
// check run
int8_t
execStatus
=
atomic_load_8
(
&
pTask
->
status
);
if
(
execStatus
==
TASK_STATUS__IDLE
||
execStatus
==
TASK_STATUS__CLOSING
)
{
SStreamTaskRunReq
*
pReq
=
taosMemoryMalloc
(
sizeof
(
SStreamTaskRunReq
));
if
(
pReq
==
NULL
)
continue
;
// TODO: do we need htonl?
pReq
->
head
.
vgId
=
pTq
->
pVnode
->
config
.
vgId
;
pReq
->
streamId
=
pTask
->
streamId
;
pReq
->
taskId
=
pTask
->
taskId
;
SRpcMsg
msg
=
{
.
msgType
=
0
,
.
pCont
=
pReq
,
.
contLen
=
sizeof
(
SStreamTaskRunReq
),
};
tmsgPutToQueue
(
&
pTq
->
pVnode
->
msgCb
,
FETCH_QUEUE
,
&
msg
);
}
}
streamDataSubmitRefDec
(
pSubmit
);
return
0
;
FAIL:
if
(
pSubmit
)
{
if
(
pSubmit
->
dataRef
)
{
taosMemoryFree
(
pSubmit
->
dataRef
);
}
taosFreeQitem
(
pSubmit
);
}
return
-
1
;
}
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
,
int32_t
workerId
)
{
SStreamTaskExecReq
req
;
tDecodeSStreamTaskExecReq
(
msg
,
&
req
);
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
f7cc44d6
...
...
@@ -34,21 +34,11 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
int32_t
tqReadHandleSetMsg
(
STqReadHandle
*
pReadHandle
,
SSubmitReq
*
pMsg
,
int64_t
ver
)
{
pReadHandle
->
pMsg
=
pMsg
;
// pMsg->length = htonl(pMsg->length);
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
// iterate and convert
if
(
tInitSubmitMsgIter
(
pMsg
,
&
pReadHandle
->
msgIter
)
<
0
)
return
-
1
;
while
(
true
)
{
if
(
tGetSubmitMsgNext
(
&
pReadHandle
->
msgIter
,
&
pReadHandle
->
pBlock
)
<
0
)
return
-
1
;
if
(
pReadHandle
->
pBlock
==
NULL
)
break
;
// pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
// pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid);
// pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion);
// pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen);
// pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen);
// pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows);
}
if
(
tInitSubmitMsgIter
(
pMsg
,
&
pReadHandle
->
msgIter
)
<
0
)
return
-
1
;
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
f7cc44d6
...
...
@@ -76,6 +76,11 @@ int32_t firstFunction(SqlFunctionCtx *pCtx);
int32_t
lastFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
lastFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getUniqueFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
uniqueFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
uniqueFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
uniqueFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getTopBotFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
);
int32_t
topFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
bottomFunction
(
SqlFunctionCtx
*
pCtx
);
...
...
source/libs/function/src/builtins.c
浏览文件 @
f7cc44d6
...
...
@@ -493,6 +493,21 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateUnique
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
if
(
1
!=
LIST_LENGTH
(
pFunc
->
pParameterList
))
{
return
TSDB_CODE_SUCCESS
;
}
SNode
*
pPara
=
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
if
(
QUERY_NODE_COLUMN
!=
nodeType
(
pPara
))
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"The parameters of UNIQUE can only be columns"
);
}
pFunc
->
node
.
resType
=
((
SExprNode
*
)
pPara
)
->
resType
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateDiff
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
int32_t
paraLen
=
LIST_LENGTH
(
pFunc
->
pParameterList
);
if
(
paraLen
==
0
||
paraLen
>
2
)
{
...
...
@@ -878,14 +893,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
finalizeFunc
=
lastFinalize
},
{
.
name
=
"
diff
"
,
.
type
=
FUNCTION_TYPE_
DIFF
,
.
classification
=
FUNC_MGT_
NONSTANDARD_SQL
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
translateFunc
=
translate
Diff
,
.
getEnvFunc
=
get
Diff
FuncEnv
,
.
initFunc
=
diff
FunctionSetup
,
.
processFunc
=
diff
Function
,
.
finalizeFunc
=
function
Finalize
.
name
=
"
unique
"
,
.
type
=
FUNCTION_TYPE_
UNIQUE
,
.
classification
=
FUNC_MGT_
AGG
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
translateFunc
=
translate
Unique
,
.
getEnvFunc
=
get
Unique
FuncEnv
,
.
initFunc
=
unique
FunctionSetup
,
.
processFunc
=
unique
Function
,
.
finalizeFunc
=
unique
Finalize
},
{
.
name
=
"histogram"
,
...
...
@@ -907,6 +922,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
processFunc
=
hllFunction
,
.
finalizeFunc
=
hllFinalize
},
{
.
name
=
"diff"
,
.
type
=
FUNCTION_TYPE_DIFF
,
.
classification
=
FUNC_MGT_NONSTANDARD_SQL_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
translateFunc
=
translateDiff
,
.
getEnvFunc
=
getDiffFuncEnv
,
.
initFunc
=
diffFunctionSetup
,
.
processFunc
=
diffFunction
,
.
finalizeFunc
=
functionFinalize
},
{
.
name
=
"state_count"
,
.
type
=
FUNCTION_TYPE_STATE_COUNT
,
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
f7cc44d6
...
...
@@ -28,12 +28,15 @@
#define TAIL_MAX_POINTS_NUM 100
#define TAIL_MAX_OFFSET 100
#define UNIQUE_MAX_RESULT_SIZE (1024*1024*10)
#define HLL_BUCKET_BITS 14 // The bits of the bucket
#define HLL_DATA_BITS (64-HLL_BUCKET_BITS)
#define HLL_BUCKETS (1<<HLL_BUCKET_BITS)
#define HLL_BUCKET_MASK (HLL_BUCKETS-1)
#define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2)
typedef
struct
SSumRes
{
union
{
int64_t
isum
;
...
...
@@ -197,6 +200,20 @@ typedef struct STailInfo {
STailItem
**
pItems
;
}
STailInfo
;
typedef
struct
SUniqueItem
{
int64_t
timestamp
;
bool
isNull
;
char
data
[];
}
SUniqueItem
;
typedef
struct
SUniqueInfo
{
int32_t
numOfPoints
;
uint8_t
colType
;
int16_t
colBytes
;
SHashObj
*
pHash
;
char
pItems
[];
}
SUniqueInfo
;
#define SET_VAL(_info, numOfElem, res) \
do { \
if ((numOfElem) <= 0) { \
...
...
@@ -216,6 +233,18 @@ typedef struct STailInfo {
} \
} while (0);
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
__ctx->fpSet.process(__ctx); \
} \
} while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
do { \
if (((left) < (right)) ^ (sign)) { \
...
...
@@ -748,50 +777,6 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return
true
;
}
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do { \
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
__ctx->fpSet.process(__ctx); \
} \
} while (0);
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
__ctx->fpSet.process(__ctx); \
} \
} while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
do { \
if (((left) < (right)) ^ (sign)) { \
(left) = (right); \
DO_UPDATE_SUBSID_RES(ctx, _ts); \
(num) += 1; \
} \
} while (0)
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
do { \
_t* d = (_t*)((_col)->pData); \
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
} \
TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
} \
} while (0)
static
void
saveTupleData
(
SqlFunctionCtx
*
pCtx
,
int32_t
rowIndex
,
const
SSDataBlock
*
pSrcBlock
,
STuplePos
*
pPos
);
static
void
copyTupleData
(
SqlFunctionCtx
*
pCtx
,
int32_t
rowIndex
,
const
SSDataBlock
*
pSrcBlock
,
STuplePos
*
pPos
);
...
...
@@ -1994,6 +1979,99 @@ int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return
pResInfo
->
numOfRes
;
}
bool
getUniqueFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
pEnv
->
calcMemSize
=
sizeof
(
SUniqueInfo
)
+
UNIQUE_MAX_RESULT_SIZE
;
return
true
;
}
bool
uniqueFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
)
{
if
(
!
functionSetup
(
pCtx
,
pResInfo
))
{
return
false
;
}
SUniqueInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
pInfo
->
numOfPoints
=
0
;
pInfo
->
colType
=
pCtx
->
resDataInfo
.
type
;
pInfo
->
colBytes
=
pCtx
->
resDataInfo
.
bytes
;
if
(
pInfo
->
pHash
!=
NULL
)
{
taosHashClear
(
pInfo
->
pHash
);
}
else
{
pInfo
->
pHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
}
return
true
;
}
static
void
doUniqueAdd
(
SUniqueInfo
*
pInfo
,
char
*
data
,
TSKEY
ts
,
bool
isNull
)
{
int32_t
hashKeyBytes
=
IS_VAR_DATA_TYPE
(
pInfo
->
colType
)
?
varDataTLen
(
data
)
:
pInfo
->
colBytes
;
SUniqueItem
*
pHashItem
=
taosHashGet
(
pInfo
->
pHash
,
data
,
hashKeyBytes
);
if
(
pHashItem
==
NULL
)
{
int32_t
size
=
sizeof
(
SUniqueItem
)
+
pInfo
->
colBytes
;
SUniqueItem
*
pItem
=
(
SUniqueItem
*
)(
pInfo
->
pItems
+
pInfo
->
numOfPoints
*
size
);
pItem
->
timestamp
=
ts
;
memcpy
(
pItem
->
data
,
data
,
pInfo
->
colBytes
);
taosHashPut
(
pInfo
->
pHash
,
data
,
hashKeyBytes
,
(
char
*
)
pItem
,
sizeof
(
SUniqueItem
*
));
pInfo
->
numOfPoints
++
;
}
else
if
(
pHashItem
->
timestamp
>
ts
)
{
pHashItem
->
timestamp
=
ts
;
}
}
int32_t
uniqueFunction
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SUniqueInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
TSKEY
*
tsList
=
(
int64_t
*
)
pInput
->
pPTS
->
pData
;
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
SColumnInfoData
*
pTsOutput
=
pCtx
->
pTsOutput
;
SColumnInfoData
*
pOutput
=
(
SColumnInfoData
*
)
pCtx
->
pOutput
;
int32_t
startOffset
=
pCtx
->
offset
;
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
doUniqueAdd
(
pInfo
,
data
,
tsList
[
i
],
colDataIsNull_s
(
pInputCol
,
i
));
if
(
sizeof
(
SUniqueInfo
)
+
pInfo
->
numOfPoints
*
(
sizeof
(
SUniqueItem
)
+
pInfo
->
colBytes
)
>=
UNIQUE_MAX_RESULT_SIZE
)
{
taosHashCleanup
(
pInfo
->
pHash
);
return
0
;
}
}
//taosqsort(pInfo->pItems, pInfo->numOfPoints, POINTER_BYTES, NULL, tailCompFn);
//for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
// int32_t pos = startOffset + i;
// STailItem *pItem = pInfo->pItems[i];
// if (pItem->isNull) {
// colDataAppendNULL(pOutput, pos);
// } else {
// colDataAppend(pOutput, pos, pItem->data, false);
// }
//}
pResInfo
->
numOfRes
=
pInfo
->
numOfPoints
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
uniqueFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SUniqueInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
int32_t
slotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
for
(
int32_t
i
=
0
;
i
<
pResInfo
->
numOfRes
;
++
i
)
{
SUniqueItem
*
pItem
=
(
SUniqueItem
*
)(
pInfo
->
pItems
+
i
*
(
sizeof
(
SUniqueItem
)
+
pInfo
->
colBytes
));
colDataAppend
(
pCol
,
i
,
pItem
->
data
,
false
);
//TODO: handle ts output
}
return
pResInfo
->
numOfRes
;
}
bool
getDiffFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
)
{
pEnv
->
calcMemSize
=
sizeof
(
SDiffInfo
);
return
true
;
...
...
@@ -2106,7 +2184,7 @@ static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SCo
default:
ASSERT
(
0
);
}
}
}
int32_t
diffFunction
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
...
...
source/libs/function/src/udfd.c
浏览文件 @
f7cc44d6
...
...
@@ -27,16 +27,16 @@
#include "trpc.h"
typedef
struct
SUdfdContext
{
uv_loop_t
*
loop
;
uv_loop_t
*
loop
;
uv_pipe_t
ctrlPipe
;
uv_signal_t
intrSignal
;
char
listenPipeName
[
PATH_MAX
+
UDF_LISTEN_PIPE_NAME_LEN
+
2
];
uv_pipe_t
listeningPipe
;
void
*
clientRpc
;
void
*
clientRpc
;
SCorEpSet
mgmtEp
;
uv_mutex_t
udfsMutex
;
SHashObj
*
udfsHash
;
SHashObj
*
udfsHash
;
bool
printVersion
;
}
SUdfdContext
;
...
...
@@ -45,7 +45,7 @@ SUdfdContext global;
typedef
struct
SUdfdUvConn
{
uv_stream_t
*
client
;
char
*
inputBuf
;
char
*
inputBuf
;
int32_t
inputLen
;
int32_t
inputCap
;
int32_t
inputTotal
;
...
...
@@ -65,25 +65,25 @@ typedef struct SUdf {
uv_mutex_t
lock
;
uv_cond_t
condReady
;
char
name
[
TSDB_FUNC_NAME_LEN
];
int8_t
funcType
;
int8_t
scriptType
;
int8_t
outputType
;
char
name
[
TSDB_FUNC_NAME_LEN
];
int8_t
funcType
;
int8_t
scriptType
;
int8_t
outputType
;
int32_t
outputLen
;
int32_t
bufSize
;
char
path
[
PATH_MAX
];
char
path
[
PATH_MAX
];
uv_lib_t
lib
;
uv_lib_t
lib
;
TUdfScalarProcFunc
scalarProcFunc
;
TUdfScalarProcFunc
scalarProcFunc
;
TUdfAggStartFunc
aggStartFunc
;
TUdfAggProcessFunc
aggProcFunc
;
TUdfAggFinishFunc
aggFinishFunc
;
TUdfAggStartFunc
aggStartFunc
;
TUdfAggProcessFunc
aggProcFunc
;
TUdfAggFinishFunc
aggFinishFunc
;
TUdfInitFunc
initFunc
;
TUdfDestroyFunc
destroyFunc
;
TUdfInitFunc
initFunc
;
TUdfDestroyFunc
destroyFunc
;
}
SUdf
;
// TODO: add private udf structure.
...
...
@@ -98,9 +98,9 @@ typedef enum EUdfdRpcReqRspType {
typedef
struct
SUdfdRpcSendRecvInfo
{
EUdfdRpcReqRspType
rpcType
;
int32_t
code
;
void
*
param
;
uv_sem_t
resultSem
;
int32_t
code
;
void
*
param
;
uv_sem_t
resultSem
;
}
SUdfdRpcSendRecvInfo
;
void
udfdProcessRpcRsp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
...
...
@@ -136,7 +136,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
tDeserializeSRetrieveFuncRsp
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
retrieveRsp
);
SFuncInfo
*
pFuncInfo
=
(
SFuncInfo
*
)
taosArrayGet
(
retrieveRsp
.
pFuncInfos
,
0
);
SUdf
*
udf
=
msgInfo
->
param
;
SUdf
*
udf
=
msgInfo
->
param
;
udf
->
funcType
=
pFuncInfo
->
funcType
;
udf
->
scriptType
=
pFuncInfo
->
scriptType
;
udf
->
outputType
=
pFuncInfo
->
outputType
;
...
...
@@ -145,7 +145,8 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
char
path
[
PATH_MAX
]
=
{
0
};
snprintf
(
path
,
sizeof
(
path
),
"%s/lib%s.so"
,
"/tmp"
,
pFuncInfo
->
name
);
TdFilePtr
file
=
taosOpenFile
(
path
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_READ
|
TD_FILE_TRUNC
|
TD_FILE_AUTO_DEL
);
TdFilePtr
file
=
taosOpenFile
(
path
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_READ
|
TD_FILE_TRUNC
|
TD_FILE_AUTO_DEL
);
// TODO check for failure of flush to disk
taosWriteFile
(
file
,
pFuncInfo
->
pCode
,
pFuncInfo
->
codeSize
);
taosCloseFile
(
&
file
);
...
...
@@ -168,11 +169,11 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
taosArrayPush
(
retrieveReq
.
pFuncNames
,
udfName
);
int32_t
contLen
=
tSerializeSRetrieveFuncReq
(
NULL
,
0
,
&
retrieveReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSRetrieveFuncReq
(
pReq
,
contLen
,
&
retrieveReq
);
taosArrayDestroy
(
retrieveReq
.
pFuncNames
);
SUdfdRpcSendRecvInfo
*
msgInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfdRpcSendRecvInfo
));
SUdfdRpcSendRecvInfo
*
msgInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfdRpcSendRecvInfo
));
msgInfo
->
rpcType
=
UDFD_RPC_RETRIVE_FUNC
;
msgInfo
->
param
=
udf
;
uv_sem_init
(
&
msgInfo
->
resultSem
,
0
);
...
...
@@ -194,7 +195,7 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
int32_t
udfdConnectToMnode
()
{
SConnectReq
connReq
=
{
0
};
connReq
.
connType
=
CONN_TYPE__UDFD
;
tstrncpy
(
connReq
.
app
,
"udfd"
,
sizeof
(
connReq
.
app
));
tstrncpy
(
connReq
.
app
,
"udfd"
,
sizeof
(
connReq
.
app
));
tstrncpy
(
connReq
.
user
,
TSDB_DEFAULT_USER
,
sizeof
(
connReq
.
user
));
char
pass
[
TSDB_PASSWORD_LEN
+
1
]
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)(
TSDB_DEFAULT_PASS
),
strlen
(
TSDB_DEFAULT_PASS
),
pass
);
...
...
@@ -203,7 +204,7 @@ int32_t udfdConnectToMnode() {
connReq
.
startTime
=
htobe64
(
taosGetTimestampMs
());
int32_t
contLen
=
tSerializeSConnectReq
(
NULL
,
0
,
&
connReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSConnectReq
(
pReq
,
contLen
,
&
connReq
);
SUdfdRpcSendRecvInfo
*
msgInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfdRpcSendRecvInfo
));
...
...
@@ -240,17 +241,17 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
return
TSDB_CODE_UDF_LOAD_UDF_FAILURE
;
}
char
initFuncName
[
TSDB_FUNC_NAME_LEN
+
5
]
=
{
0
};
char
initFuncName
[
TSDB_FUNC_NAME_LEN
+
5
]
=
{
0
};
char
*
initSuffix
=
"_init"
;
strcpy
(
initFuncName
,
udfName
);
strncat
(
initFuncName
,
initSuffix
,
strlen
(
initSuffix
));
uv_dlsym
(
&
udf
->
lib
,
initFuncName
,
(
void
**
)(
&
udf
->
initFunc
));
uv_dlsym
(
&
udf
->
lib
,
initFuncName
,
(
void
**
)(
&
udf
->
initFunc
));
char
destroyFuncName
[
TSDB_FUNC_NAME_LEN
+
5
]
=
{
0
};
char
destroyFuncName
[
TSDB_FUNC_NAME_LEN
+
5
]
=
{
0
};
char
*
destroySuffix
=
"_destroy"
;
strcpy
(
destroyFuncName
,
udfName
);
strncat
(
destroyFuncName
,
destroySuffix
,
strlen
(
destroySuffix
));
uv_dlsym
(
&
udf
->
lib
,
destroyFuncName
,
(
void
**
)(
&
udf
->
destroyFunc
));
uv_dlsym
(
&
udf
->
lib
,
destroyFuncName
,
(
void
**
)(
&
udf
->
destroyFunc
));
if
(
udf
->
funcType
==
TSDB_FUNC_TYPE_SCALAR
)
{
char
processFuncName
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
...
...
@@ -270,87 +271,86 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
strncpy
(
finishFuncName
,
processFuncName
,
strlen
(
processFuncName
));
strncat
(
finishFuncName
,
finishSuffix
,
strlen
(
finishSuffix
));
uv_dlsym
(
&
udf
->
lib
,
finishFuncName
,
(
void
**
)(
&
udf
->
aggFinishFunc
));
//TODO: merge
//
TODO: merge
}
return
0
;
}
void
udfdProcessSetupRequest
(
SUvUdfWork
*
uvUdf
,
SUdfRequest
*
request
)
{
// TODO: tracable id from client. connect, setup, call, teardown
fnInfo
(
"setup request. seq num: %"
PRId64
", udf name: %s"
,
request
->
seqNum
,
request
->
setup
.
udfName
);
SUdfSetupRequest
*
setup
=
&
request
->
setup
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SUdf
*
udf
=
NULL
;
uv_mutex_lock
(
&
global
.
udfsMutex
);
SUdf
**
udfInHash
=
taosHashGet
(
global
.
udfsHash
,
request
->
setup
.
udfName
,
strlen
(
request
->
setup
.
udfName
));
if
(
udfInHash
)
{
++
(
*
udfInHash
)
->
refCount
;
udf
=
*
udfInHash
;
uv_mutex_unlock
(
&
global
.
udfsMutex
);
}
else
{
SUdf
*
udfNew
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdf
));
udfNew
->
refCount
=
1
;
udfNew
->
state
=
UDF_STATE_INIT
;
uv_mutex_init
(
&
udfNew
->
lock
);
uv_cond_init
(
&
udfNew
->
condReady
);
udf
=
udfNew
;
taosHashPut
(
global
.
udfsHash
,
request
->
setup
.
udfName
,
strlen
(
request
->
setup
.
udfName
),
&
udfNew
,
sizeof
(
&
udfNew
));
uv_mutex_unlock
(
&
global
.
udfsMutex
);
void
udfdProcessSetupRequest
(
SUvUdfWork
*
uvUdf
,
SUdfRequest
*
request
)
{
// TODO: tracable id from client. connect, setup, call, teardown
fnInfo
(
"setup request. seq num: %"
PRId64
", udf name: %s"
,
request
->
seqNum
,
request
->
setup
.
udfName
);
SUdfSetupRequest
*
setup
=
&
request
->
setup
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SUdf
*
udf
=
NULL
;
uv_mutex_lock
(
&
global
.
udfsMutex
);
SUdf
**
udfInHash
=
taosHashGet
(
global
.
udfsHash
,
request
->
setup
.
udfName
,
strlen
(
request
->
setup
.
udfName
));
if
(
udfInHash
)
{
++
(
*
udfInHash
)
->
refCount
;
udf
=
*
udfInHash
;
uv_mutex_unlock
(
&
global
.
udfsMutex
);
}
else
{
SUdf
*
udfNew
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdf
));
udfNew
->
refCount
=
1
;
udfNew
->
state
=
UDF_STATE_INIT
;
uv_mutex_init
(
&
udfNew
->
lock
);
uv_cond_init
(
&
udfNew
->
condReady
);
udf
=
udfNew
;
taosHashPut
(
global
.
udfsHash
,
request
->
setup
.
udfName
,
strlen
(
request
->
setup
.
udfName
),
&
udfNew
,
sizeof
(
&
udfNew
));
uv_mutex_unlock
(
&
global
.
udfsMutex
);
}
uv_mutex_lock
(
&
udf
->
lock
);
if
(
udf
->
state
==
UDF_STATE_INIT
)
{
udf
->
state
=
UDF_STATE_LOADING
;
code
=
udfdLoadUdf
(
setup
->
udfName
,
udf
);
if
(
udf
->
initFunc
)
{
udf
->
initFunc
();
}
uv_mutex_lock
(
&
udf
->
lock
);
if
(
udf
->
state
==
UDF_STATE_INIT
)
{
udf
->
state
=
UDF_STATE_LOADING
;
code
=
udfdLoadUdf
(
setup
->
udfName
,
udf
);
if
(
udf
->
initFunc
)
{
udf
->
initFunc
();
}
udf
->
state
=
UDF_STATE_READY
;
uv_cond_broadcast
(
&
udf
->
condReady
);
uv_mutex_unlock
(
&
udf
->
lock
);
}
else
{
while
(
udf
->
state
!=
UDF_STATE_READY
)
{
uv_cond_wait
(
&
udf
->
condReady
,
&
udf
->
lock
);
}
uv_mutex_unlock
(
&
udf
->
lock
);
udf
->
state
=
UDF_STATE_READY
;
uv_cond_broadcast
(
&
udf
->
condReady
);
uv_mutex_unlock
(
&
udf
->
lock
);
}
else
{
while
(
udf
->
state
!=
UDF_STATE_READY
)
{
uv_cond_wait
(
&
udf
->
condReady
,
&
udf
->
lock
);
}
SUdfcFuncHandle
*
handle
=
taosMemoryMalloc
(
sizeof
(
SUdfcFuncHandle
));
handle
->
udf
=
udf
;
SUdfResponse
rsp
;
rsp
.
seqNum
=
request
->
seqNum
;
rsp
.
type
=
request
->
type
;
rsp
.
code
=
code
;
rsp
.
setupRsp
.
udfHandle
=
(
int64_t
)(
handle
);
rsp
.
setupRsp
.
outputType
=
udf
->
outputType
;
rsp
.
setupRsp
.
outputLen
=
udf
->
outputLen
;
rsp
.
setupRsp
.
bufSize
=
udf
->
bufSize
;
int32_t
len
=
encodeUdfResponse
(
NULL
,
&
rsp
);
rsp
.
msgLen
=
len
;
void
*
bufBegin
=
taosMemoryMalloc
(
len
);
void
*
buf
=
bufBegin
;
encodeUdfResponse
(
&
buf
,
&
rsp
);
uvUdf
->
output
=
uv_buf_init
(
bufBegin
,
len
);
taosMemoryFree
(
uvUdf
->
input
.
base
);
return
;
uv_mutex_unlock
(
&
udf
->
lock
);
}
SUdfcFuncHandle
*
handle
=
taosMemoryMalloc
(
sizeof
(
SUdfcFuncHandle
));
handle
->
udf
=
udf
;
SUdfResponse
rsp
;
rsp
.
seqNum
=
request
->
seqNum
;
rsp
.
type
=
request
->
type
;
rsp
.
code
=
code
;
rsp
.
setupRsp
.
udfHandle
=
(
int64_t
)(
handle
);
rsp
.
setupRsp
.
outputType
=
udf
->
outputType
;
rsp
.
setupRsp
.
outputLen
=
udf
->
outputLen
;
rsp
.
setupRsp
.
bufSize
=
udf
->
bufSize
;
int32_t
len
=
encodeUdfResponse
(
NULL
,
&
rsp
);
rsp
.
msgLen
=
len
;
void
*
bufBegin
=
taosMemoryMalloc
(
len
);
void
*
buf
=
bufBegin
;
encodeUdfResponse
(
&
buf
,
&
rsp
);
uvUdf
->
output
=
uv_buf_init
(
bufBegin
,
len
);
taosMemoryFree
(
uvUdf
->
input
.
base
);
return
;
}
void
udfdProcessCallRequest
(
SUvUdfWork
*
uvUdf
,
SUdfRequest
*
request
)
{
SUdfCallRequest
*
call
=
&
request
->
call
;
fnDebug
(
"%"
PRId64
"call request. call type %d, handle: %"
PRIx64
,
request
->
seqNum
,
call
->
callType
,
call
->
udfHandle
);
SUdfcFuncHandle
*
handle
=
(
SUdfcFuncHandle
*
)(
call
->
udfHandle
);
SUdf
*
udf
=
handle
->
udf
;
SUdfResponse
response
=
{
0
};
SUdfResponse
*
rsp
=
&
response
;
fnDebug
(
"%"
PRId64
"call request. call type %d, handle: %"
PRIx64
,
request
->
seqNum
,
call
->
callType
,
call
->
udfHandle
);
SUdfcFuncHandle
*
handle
=
(
SUdfcFuncHandle
*
)(
call
->
udfHandle
);
SUdf
*
udf
=
handle
->
udf
;
SUdfResponse
response
=
{
0
};
SUdfResponse
*
rsp
=
&
response
;
SUdfCallResponse
*
subRsp
=
&
rsp
->
callRsp
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
switch
(
call
->
callType
)
{
switch
(
call
->
callType
)
{
case
TSDB_UDF_CALL_SCALA_PROC
:
{
SUdfColumn
output
=
{
0
};
...
...
@@ -363,9 +363,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
break
;
}
case
TSDB_UDF_CALL_AGG_INIT
:
{
SUdfInterBuf
outBuf
=
{.
buf
=
taosMemoryMalloc
(
udf
->
bufSize
),
.
bufLen
=
udf
->
bufSize
,
.
numOfResult
=
0
};
SUdfInterBuf
outBuf
=
{.
buf
=
taosMemoryMalloc
(
udf
->
bufSize
),
.
bufLen
=
udf
->
bufSize
,
.
numOfResult
=
0
};
udf
->
aggStartFunc
(
&
outBuf
);
subRsp
->
resultBuf
=
outBuf
;
break
;
...
...
@@ -373,9 +371,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
case
TSDB_UDF_CALL_AGG_PROC
:
{
SUdfDataBlock
input
=
{
0
};
convertDataBlockToUdfDataBlock
(
&
call
->
block
,
&
input
);
SUdfInterBuf
outBuf
=
{.
buf
=
taosMemoryMalloc
(
udf
->
bufSize
),
.
bufLen
=
udf
->
bufSize
,
.
numOfResult
=
0
};
SUdfInterBuf
outBuf
=
{.
buf
=
taosMemoryMalloc
(
udf
->
bufSize
),
.
bufLen
=
udf
->
bufSize
,
.
numOfResult
=
0
};
code
=
udf
->
aggProcFunc
(
&
input
,
&
call
->
interBuf
,
&
outBuf
);
freeUdfInterBuf
(
&
call
->
interBuf
);
freeUdfDataDataBlock
(
&
input
);
...
...
@@ -384,9 +380,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
break
;
}
case
TSDB_UDF_CALL_AGG_FIN
:
{
SUdfInterBuf
outBuf
=
{.
buf
=
taosMemoryMalloc
(
udf
->
bufSize
),
.
bufLen
=
udf
->
bufSize
,
.
numOfResult
=
0
};
SUdfInterBuf
outBuf
=
{.
buf
=
taosMemoryMalloc
(
udf
->
bufSize
),
.
bufLen
=
udf
->
bufSize
,
.
numOfResult
=
0
};
code
=
udf
->
aggFinishFunc
(
&
call
->
interBuf
,
&
outBuf
);
freeUdfInterBuf
(
&
call
->
interBuf
);
subRsp
->
resultBuf
=
outBuf
;
...
...
@@ -429,20 +423,19 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
}
default:
break
;
}
taosMemoryFree
(
uvUdf
->
input
.
base
);
return
;
}
void
udfdProcessTeardownRequest
(
SUvUdfWork
*
uvUdf
,
SUdfRequest
*
request
)
{
void
udfdProcessTeardownRequest
(
SUvUdfWork
*
uvUdf
,
SUdfRequest
*
request
)
{
SUdfTeardownRequest
*
teardown
=
&
request
->
teardown
;
fnInfo
(
"teardown. seq number: %"
PRId64
", handle:%"
PRIx64
,
request
->
seqNum
,
teardown
->
udfHandle
);
SUdfcFuncHandle
*
handle
=
(
SUdfcFuncHandle
*
)(
teardown
->
udfHandle
);
SUdf
*
udf
=
handle
->
udf
;
bool
unloadUdf
=
false
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SUdf
*
udf
=
handle
->
udf
;
bool
unloadUdf
=
false
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
uv_mutex_lock
(
&
global
.
udfsMutex
);
udf
->
refCount
--
;
...
...
@@ -568,7 +561,7 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
}
void
udfdHandleRequest
(
SUdfdUvConn
*
conn
)
{
uv_work_t
*
work
=
taosMemoryMalloc
(
sizeof
(
uv_work_t
));
uv_work_t
*
work
=
taosMemoryMalloc
(
sizeof
(
uv_work_t
));
SUvUdfWork
*
udfWork
=
taosMemoryMalloc
(
sizeof
(
SUvUdfWork
));
udfWork
->
client
=
conn
->
client
;
udfWork
->
input
=
uv_buf_init
(
conn
->
inputBuf
,
conn
->
inputLen
);
...
...
@@ -653,11 +646,11 @@ static bool udfdRpcRfp(int32_t code) {
}
}
int
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
)
{
int
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
)
{
pEpSet
->
version
=
0
;
// init mnode ip set
SEpSet
*
mgmtEpSet
=
&
(
pEpSet
->
epSet
);
SEpSet
*
mgmtEpSet
=
&
(
pEpSet
->
epSet
);
mgmtEpSet
->
numOfEps
=
0
;
mgmtEpSet
->
inUse
=
0
;
...
...
@@ -694,7 +687,6 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
return
0
;
}
int32_t
udfdOpenClientRpc
()
{
SRpcInit
rpcInit
=
{
0
};
rpcInit
.
label
=
"UDFD"
;
...
...
@@ -704,15 +696,9 @@ int32_t udfdOpenClientRpc() {
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
user
=
TSDB_DEFAULT_USER
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
parent
=
&
global
;
rpcInit
.
rfp
=
udfdRpcRfp
;
char
pass
[
TSDB_PASSWORD_LEN
+
1
]
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)(
TSDB_DEFAULT_PASS
),
strlen
(
TSDB_DEFAULT_PASS
),
pass
);
rpcInit
.
secret
=
pass
;
global
.
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
global
.
clientRpc
==
NULL
)
{
fnError
(
"failed to init dnode rpc client"
);
...
...
@@ -823,7 +809,7 @@ static int32_t udfdUvInit() {
return
0
;
}
static
void
udfdCloseWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
)
{
static
void
udfdCloseWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
)
{
if
(
!
uv_is_closing
(
handle
))
{
uv_close
(
handle
,
NULL
);
}
...
...
@@ -883,7 +869,7 @@ int main(int argc, char *argv[]) {
int32_t
retryMnodeTimes
=
0
;
int32_t
code
=
0
;
while
(
retryMnodeTimes
++
<
TSDB_MAX_REPLICA
)
{
uv_sleep
(
500
*
(
1
<<
retryMnodeTimes
));
uv_sleep
(
500
*
(
1
<<
retryMnodeTimes
));
code
=
udfdConnectToMnode
();
if
(
code
==
0
)
{
break
;
...
...
source/libs/index/src/indexFst.c
浏览文件 @
f7cc44d6
...
...
@@ -99,7 +99,7 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output
if
(
fstSliceIsEmpty
(
s
))
{
return
;
}
size_t
sz
=
taosArrayGetSize
(
nodes
->
stack
)
-
1
;
int32_t
sz
=
taosArrayGetSize
(
nodes
->
stack
)
-
1
;
FstBuilderNodeUnfinished
*
un
=
taosArrayGet
(
nodes
->
stack
,
sz
);
assert
(
un
->
last
==
NULL
);
...
...
@@ -130,11 +130,11 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output
uint64_t
fstUnFinishedNodesFindCommPrefix
(
FstUnFinishedNodes
*
node
,
FstSlice
bs
)
{
FstSlice
*
s
=
&
bs
;
size_t
ssz
=
taosArrayGetSize
(
node
->
stack
);
// stack size
int32_t
ssz
=
taosArrayGetSize
(
node
->
stack
);
// stack size
uint64_t
count
=
0
;
int32_t
lsz
;
// data len
uint8_t
*
data
=
fstSliceData
(
s
,
&
lsz
);
for
(
size
_t
i
=
0
;
i
<
ssz
&&
i
<
lsz
;
i
++
)
{
for
(
int32
_t
i
=
0
;
i
<
ssz
&&
i
<
lsz
;
i
++
)
{
FstBuilderNodeUnfinished
*
un
=
taosArrayGet
(
node
->
stack
,
i
);
if
(
un
->
last
->
inp
==
data
[
i
])
{
count
++
;
...
...
@@ -147,8 +147,8 @@ uint64_t fstUnFinishedNodesFindCommPrefix(FstUnFinishedNodes* node, FstSlice bs)
uint64_t
fstUnFinishedNodesFindCommPrefixAndSetOutput
(
FstUnFinishedNodes
*
node
,
FstSlice
bs
,
Output
in
,
Output
*
out
)
{
FstSlice
*
s
=
&
bs
;
size
_t
lsz
=
(
size_t
)(
s
->
end
-
s
->
start
+
1
);
// data len
size
_t
ssz
=
taosArrayGetSize
(
node
->
stack
);
// stack size
int32
_t
lsz
=
(
size_t
)(
s
->
end
-
s
->
start
+
1
);
// data len
int32
_t
ssz
=
taosArrayGetSize
(
node
->
stack
);
// stack size
*
out
=
in
;
uint64_t
i
=
0
;
for
(
i
=
0
;
i
<
lsz
&&
i
<
ssz
;
i
++
)
{
...
...
@@ -245,7 +245,7 @@ void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTran
return
;
}
void
fstStateCompileForAnyTrans
(
FstCountingWriter
*
w
,
CompiledAddr
addr
,
FstBuilderNode
*
node
)
{
size
_t
sz
=
taosArrayGetSize
(
node
->
trans
);
int32
_t
sz
=
taosArrayGetSize
(
node
->
trans
);
assert
(
sz
<=
256
);
uint8_t
tSize
=
0
;
...
...
@@ -253,7 +253,7 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
// finalOutput.is_zero()
bool
anyOuts
=
(
node
->
finalOutput
!=
0
);
for
(
size
_t
i
=
0
;
i
<
sz
;
i
++
)
{
for
(
int32
_t
i
=
0
;
i
<
sz
;
i
++
)
{
FstTransition
*
t
=
taosArrayGet
(
node
->
trans
,
i
);
tSize
=
TMAX
(
tSize
,
packDeltaSize
(
addr
,
t
->
addr
));
oSize
=
TMAX
(
oSize
,
packSize
(
t
->
out
));
...
...
@@ -301,7 +301,7 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
/// for (uint8_t i = 0; i < 256; i++) {
// index[i] = 255;
///}
for
(
size
_t
i
=
0
;
i
<
sz
;
i
++
)
{
for
(
int32
_t
i
=
0
;
i
<
sz
;
i
++
)
{
FstTransition
*
t
=
taosArrayGet
(
node
->
trans
,
i
);
index
[
t
->
inp
]
=
i
;
// fstPackDeltaIn(w, addr, t->addr, tSize);
...
...
@@ -731,7 +731,7 @@ bool fstNodeFindInput(FstNode* node, uint8_t b, uint64_t* res) {
}
bool
fstNodeCompile
(
FstNode
*
node
,
void
*
w
,
CompiledAddr
lastAddr
,
CompiledAddr
addr
,
FstBuilderNode
*
builderNode
)
{
size
_t
sz
=
taosArrayGetSize
(
builderNode
->
trans
);
int32
_t
sz
=
taosArrayGetSize
(
builderNode
->
trans
);
assert
(
sz
<
256
);
if
(
sz
==
0
&&
builderNode
->
isFinal
&&
builderNode
->
finalOutput
==
0
)
{
return
true
;
...
...
@@ -959,8 +959,8 @@ void fstBuilderNodeUnfinishedAddOutputPrefix(FstBuilderNodeUnfinished* unNode, O
if
(
FST_BUILDER_NODE_IS_FINAL
(
unNode
->
node
))
{
unNode
->
node
->
finalOutput
+=
out
;
}
size
_t
sz
=
taosArrayGetSize
(
unNode
->
node
->
trans
);
for
(
size
_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32
_t
sz
=
taosArrayGetSize
(
unNode
->
node
->
trans
);
for
(
int32
_t
i
=
0
;
i
<
sz
;
i
++
)
{
FstTransition
*
trn
=
taosArrayGet
(
unNode
->
node
->
trans
,
i
);
trn
->
out
+=
out
;
}
...
...
@@ -1077,7 +1077,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
tOut
=
tOut
+
FST_NODE_FINAL_OUTPUT
(
root
);
}
for
(
size
_t
i
=
0
;
i
<
taosArrayGetSize
(
nodes
);
i
++
)
{
for
(
int32
_t
i
=
0
;
i
<
taosArrayGetSize
(
nodes
);
i
++
)
{
FstNode
**
node
=
(
FstNode
**
)
taosArrayGet
(
nodes
,
i
);
fstNodeDestroy
(
*
node
);
}
...
...
@@ -1352,7 +1352,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
StreamState
s2
=
{.
node
=
nextNode
,
.
trans
=
0
,
.
out
=
{.
null
=
false
,
.
out
=
out
},
.
autState
=
nextState
};
taosArrayPush
(
sws
->
stack
,
&
s2
);
size_t
isz
=
taosArrayGetSize
(
sws
->
inp
);
int32_t
isz
=
taosArrayGetSize
(
sws
->
inp
);
uint8_t
*
buf
=
(
uint8_t
*
)
taosMemoryMalloc
(
isz
*
sizeof
(
uint8_t
));
for
(
uint32_t
i
=
0
;
i
<
isz
;
i
++
)
{
buf
[
i
]
=
*
(
uint8_t
*
)
taosArrayGet
(
sws
->
inp
,
i
);
...
...
source/libs/index/src/indexTfile.c
浏览文件 @
f7cc44d6
...
...
@@ -116,7 +116,7 @@ TFileCache* tfileCacheCreate(const char* path) {
continue
;
}
TFileHeader
*
header
=
&
reader
->
header
;
ICacheKey
key
=
{.
suid
=
header
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
strlen
(
header
->
colName
)};
ICacheKey
key
=
{.
suid
=
header
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
(
int32_t
)
strlen
(
header
->
colName
)};
char
buf
[
128
]
=
{
0
};
int32_t
sz
=
indexSerialCacheKey
(
&
key
,
buf
);
...
...
@@ -230,7 +230,7 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
indexInfo
(
"index: %"
PRIu64
", col: %s, colVal: %s, found table info in tindex, time cost: %"
PRIu64
"us"
,
tem
->
suid
,
tem
->
colName
,
tem
->
colVal
,
cost
);
ret
=
tfileReaderLoadTableIds
((
TFileReader
*
)
reader
,
offset
,
tr
->
total
);
ret
=
tfileReaderLoadTableIds
((
TFileReader
*
)
reader
,
(
int32_t
)
offset
,
tr
->
total
);
cost
=
taosGetTimestampUs
()
-
et
;
indexInfo
(
"index: %"
PRIu64
", col: %s, colVal: %s, load all table info, time cost: %"
PRIu64
"us"
,
tem
->
suid
,
tem
->
colName
,
tem
->
colVal
,
cost
);
...
...
@@ -890,7 +890,7 @@ static int tfileWriteFooter(TFileWriter* write) {
char
buf
[
sizeof
(
tfileMagicNumber
)
+
1
]
=
{
0
};
void
*
pBuf
=
(
void
*
)
buf
;
taosEncodeFixedU64
((
void
**
)(
void
*
)
&
pBuf
,
tfileMagicNumber
);
int
nwrite
=
write
->
ctx
->
write
(
write
->
ctx
,
buf
,
strlen
(
buf
));
int
nwrite
=
write
->
ctx
->
write
(
write
->
ctx
,
buf
,
(
int32_t
)
strlen
(
buf
));
indexInfo
(
"tfile write footer size: %d"
,
write
->
ctx
->
size
(
write
->
ctx
));
assert
(
nwrite
==
sizeof
(
tfileMagicNumber
));
...
...
source/libs/index/src/indexUtil.c
浏览文件 @
f7cc44d6
...
...
@@ -37,14 +37,14 @@ static int iBinarySearch(SArray *arr, int s, int e, uint64_t k) {
}
void
iIntersection
(
SArray
*
inters
,
SArray
*
final
)
{
int32_t
sz
=
taosArrayGetSize
(
inters
);
int32_t
sz
=
(
int32_t
)
taosArrayGetSize
(
inters
);
if
(
sz
<=
0
)
{
return
;
}
MergeIndex
*
mi
=
taosMemoryCalloc
(
sz
,
sizeof
(
MergeIndex
));
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SArray
*
t
=
taosArrayGetP
(
inters
,
i
);
mi
[
i
].
len
=
taosArrayGetSize
(
t
);
mi
[
i
].
len
=
(
int32_t
)
taosArrayGetSize
(
t
);
mi
[
i
].
idx
=
0
;
}
...
...
@@ -70,7 +70,7 @@ void iIntersection(SArray *inters, SArray *final) {
taosMemoryFreeClear
(
mi
);
}
void
iUnion
(
SArray
*
inters
,
SArray
*
final
)
{
int32_t
sz
=
taosArrayGetSize
(
inters
);
int32_t
sz
=
(
int32_t
)
taosArrayGetSize
(
inters
);
if
(
sz
<=
0
)
{
return
;
}
...
...
@@ -82,7 +82,7 @@ void iUnion(SArray *inters, SArray *final) {
MergeIndex
*
mi
=
taosMemoryCalloc
(
sz
,
sizeof
(
MergeIndex
));
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SArray
*
t
=
taosArrayGetP
(
inters
,
i
);
mi
[
i
].
len
=
taosArrayGetSize
(
t
);
mi
[
i
].
len
=
(
int32_t
)
taosArrayGetSize
(
t
);
mi
[
i
].
idx
=
0
;
}
while
(
1
)
{
...
...
@@ -117,8 +117,8 @@ void iUnion(SArray *inters, SArray *final) {
}
void
iExcept
(
SArray
*
total
,
SArray
*
except
)
{
int32_t
tsz
=
taosArrayGetSize
(
total
);
int32_t
esz
=
taosArrayGetSize
(
except
);
int32_t
tsz
=
(
int32_t
)
taosArrayGetSize
(
total
);
int32_t
esz
=
(
int32_t
)
taosArrayGetSize
(
except
);
if
(
esz
==
0
||
tsz
==
0
)
{
return
;
}
...
...
@@ -141,7 +141,10 @@ int uidCompare(const void *a, const void *b) {
// add more version compare
uint64_t
u1
=
*
(
uint64_t
*
)
a
;
uint64_t
u2
=
*
(
uint64_t
*
)
b
;
return
u1
-
u2
;
if
(
u1
==
u2
)
{
return
0
;
}
return
u1
<
u2
?
-
1
:
1
;
}
int
verdataCompare
(
const
void
*
a
,
const
void
*
b
)
{
SIdxVerdata
*
va
=
(
SIdxVerdata
*
)
a
;
...
...
source/libs/index/test/fstTest.cc
浏览文件 @
f7cc44d6
...
...
@@ -48,7 +48,7 @@ class FstWriter {
class
FstReadMemory
{
public:
FstReadMemory
(
size
_t
size
,
const
std
::
string
&
fileName
=
"/tmp/tindex.tindex"
)
{
FstReadMemory
(
int32
_t
size
,
const
std
::
string
&
fileName
=
"/tmp/tindex.tindex"
)
{
_wc
=
writerCtxCreate
(
TFile
,
fileName
.
c_str
(),
true
,
64
*
1024
);
_w
=
fstCountingWriterCreate
(
_wc
);
_size
=
size
;
...
...
@@ -152,7 +152,7 @@ class FstReadMemory {
Fst
*
_fst
;
FstSlice
_s
;
WriterCtx
*
_wc
;
size_t
_size
;
int32_t
_size
;
};
#define L 100
...
...
source/libs/stream/src/tstream.c
浏览文件 @
f7cc44d6
...
...
@@ -247,6 +247,19 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) {
void
*
data
=
NULL
;
taosGetQitem
(
pTask
->
inputQAll
,
&
data
);
if
(
data
==
NULL
)
break
;
streamTaskExecImpl
(
pTask
,
data
,
pRes
);
taosFreeQitem
(
data
);
if
(
taosArrayGetSize
(
pRes
)
!=
0
)
{
SStreamDataBlock
*
resQ
=
taosAllocateQitem
(
sizeof
(
void
**
),
DEF_QITEM
);
resQ
->
type
=
STREAM_INPUT__DATA_BLOCK
;
resQ
->
blocks
=
pRes
;
taosWriteQitem
(
pTask
->
outputQ
,
resQ
);
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pRes
==
NULL
)
goto
FAIL
;
}
}
atomic_store_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
);
...
...
@@ -298,62 +311,66 @@ int32_t streamTaskSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
}
// dispatch
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
SRpcMsg
dispatchMsg
=
{
0
};
if
(
streamBuildExecMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
NULL
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
// TODO dispatch guard
int8_t
outputStatus
=
atomic_load_8
(
&
pTask
->
outputStatus
);
if
(
outputStatus
==
TASK_OUTPUT_STATUS__NORMAL
)
{
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
SRpcMsg
dispatchMsg
=
{
0
};
if
(
streamBuildExecMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
NULL
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
int32_t
qType
;
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_PIPE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_PIPE_EXEC
)
{
qType
=
FETCH_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_MERGE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_MERGE_EXEC
)
{
qType
=
MERGE_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_WRITE_EXEC
)
{
qType
=
WRITE_QUEUE
;
}
else
{
ASSERT
(
0
);
}
tmsgPutToQueue
(
pMsgCb
,
qType
,
&
dispatchMsg
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
SRpcMsg
dispatchMsg
=
{
0
};
SEpSet
*
pEpSet
=
NULL
;
if
(
streamBuildExecMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
&
pEpSet
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
int32_t
qType
;
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_PIPE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_PIPE_EXEC
)
{
qType
=
FETCH_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_MERGE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_MERGE_EXEC
)
{
qType
=
MERGE_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_WRITE_EXEC
)
{
qType
=
WRITE_QUEUE
;
}
else
{
ASSERT
(
0
);
}
tmsgPutToQueue
(
pMsgCb
,
qType
,
&
dispatchMsg
);
tmsgSendReq
(
pEpSet
,
&
dispatchMsg
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
SRpcMsg
dispatchMsg
=
{
0
};
SEpSet
*
pEpSet
=
NULL
;
if
(
streamBuildExecMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
&
pEpSet
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
SHashObj
*
pShuffleRes
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
pShuffleRes
==
NULL
)
{
return
-
1
;
}
tmsgSendReq
(
pEpSet
,
&
dispatchMsg
);
int32_t
sz
=
taosArrayGetSize
(
pRes
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pRes
,
i
);
SArray
*
pArray
=
taosHashGet
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
));
if
(
pArray
==
NULL
)
{
pArray
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
SHashObj
*
pShuffleRes
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
pShuffleRes
==
NULL
)
{
return
-
1
;
}
int32_t
sz
=
taosArrayGetSize
(
pRes
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pRes
,
i
);
SArray
*
pArray
=
taosHashGet
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
));
if
(
pArray
==
NULL
)
{
return
-
1
;
pArray
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pArray
==
NULL
)
{
return
-
1
;
}
taosHashPut
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
),
&
pArray
,
sizeof
(
void
*
));
}
taos
HashPut
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
),
&
pArray
,
sizeof
(
void
*
)
);
taos
ArrayPush
(
pArray
,
pDataBlock
);
}
taosArrayPush
(
pArray
,
pDataBlock
);
}
if
(
streamShuffleDispatch
(
pTask
,
pMsgCb
,
pShuffleRes
)
<
0
)
{
return
-
1
;
}
if
(
streamShuffleDispatch
(
pTask
,
pMsgCb
,
pShuffleRes
)
<
0
)
{
return
-
1
;
}
}
else
{
ASSERT
(
pTask
->
dispatchType
==
TASK_DISPATCH__NONE
);
}
else
{
ASSERT
(
pTask
->
dispatchType
==
TASK_DISPATCH__NONE
);
}
}
}
return
0
;
...
...
@@ -406,11 +423,32 @@ int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStream
return
0
;
}
int32_t
streamTaskProcessDispatchRsp
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchRsp
*
pRsp
)
{
atomic_store_8
(
&
pTask
->
inputStatus
,
pRsp
->
inputStatus
);
if
(
pRsp
->
inputStatus
==
TASK_INPUT_STATUS__BLOCKED
)
{
// TODO: init recover timer
}
// continue dispatch
streamTaskSink
(
pTask
,
pMsgCb
);
return
0
;
}
int32_t
streamTaskProcessRunReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
streamTaskExec2
(
pTask
,
pMsgCb
);
streamTaskSink
(
pTask
,
pMsgCb
);
return
0
;
}
int32_t
streamTaskProcessRecoverReq
(
SStreamTask
*
pTask
,
char
*
msg
)
{
//
return
0
;
}
int32_t
streamTaskProcessRecoverRsp
(
SStreamTask
*
pTask
,
char
*
msg
)
{
//
return
0
;
}
int32_t
streamExecTask
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
const
void
*
input
,
int32_t
inputType
,
int32_t
workId
)
{
SArray
*
pRes
=
NULL
;
// source
...
...
source/libs/sync/src/syncIO.c
浏览文件 @
f7cc44d6
...
...
@@ -183,9 +183,6 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
rpcInit
.
sessions
=
100
;
rpcInit
.
idleTime
=
100
;
rpcInit
.
user
=
"sync-io"
;
rpcInit
.
secret
=
"sync-io"
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
0
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
io
->
clientRpc
=
rpcOpen
(
&
rpcInit
);
...
...
@@ -206,7 +203,6 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
rpcInit
.
cfp
=
syncIOProcessRequest
;
rpcInit
.
sessions
=
1000
;
rpcInit
.
idleTime
=
2
*
1500
;
rpcInit
.
afp
=
syncIOAuth
;
rpcInit
.
parent
=
io
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
...
...
source/libs/transport/inc/transportInt.h
浏览文件 @
f7cc44d6
...
...
@@ -52,23 +52,15 @@ typedef struct {
int
idleTime
;
// milliseconds;
uint16_t
localPort
;
int8_t
connType
;
int64_t
index
;
char
label
[
TSDB_LABEL_LEN
];
char
user
[
TSDB_UNI_LEN
];
// meter ID
char
spi
;
// security parameter index
char
encrypt
;
// encrypt algorithm
char
secret
[
TSDB_PASSWORD_LEN
];
// secret for the link
char
ckey
[
TSDB_PASSWORD_LEN
];
// ciphering key
char
user
[
TSDB_UNI_LEN
];
// meter ID
void
(
*
cfp
)(
void
*
parent
,
SRpcMsg
*
,
SEpSet
*
);
bool
(
*
retry
)(
int32_t
code
);
int
index
;
int32_t
refCount
;
void
*
parent
;
void
*
idPool
;
// handle to ID pool
void
*
tmrCtrl
;
// handle to timer
SHashObj
*
hash
;
// handle returned by hash utility
void
*
tcphandle
;
// returned handle from TCP initialization
TdThreadMutex
mutex
;
}
SRpcInfo
;
...
...
source/libs/transport/src/trans.c
浏览文件 @
f7cc44d6
...
...
@@ -69,9 +69,6 @@ void* rpcOpen(const SRpcInit* pInit) {
if
(
pInit
->
user
)
{
memcpy
(
pRpc
->
user
,
pInit
->
user
,
strlen
(
pInit
->
user
));
}
if
(
pInit
->
secret
)
{
memcpy
(
pRpc
->
secret
,
pInit
->
secret
,
strlen
(
pInit
->
secret
));
}
return
pRpc
;
}
void
rpcClose
(
void
*
arg
)
{
...
...
source/libs/transport/test/pushServer.c
浏览文件 @
f7cc44d6
...
...
@@ -134,7 +134,6 @@ int main(int argc, char *argv[]) {
rpcInit
.
cfp
=
processRequestMsg
;
rpcInit
.
sessions
=
1000
;
rpcInit
.
idleTime
=
2
*
1500
;
rpcInit
.
afp
=
retrieveAuthInfo
;
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
...
...
source/libs/transport/test/rclient.c
浏览文件 @
f7cc44d6
...
...
@@ -118,9 +118,6 @@ int main(int argc, char *argv[]) {
rpcInit
.
sessions
=
100
;
rpcInit
.
idleTime
=
100
;
rpcInit
.
user
=
"michael"
;
rpcInit
.
secret
=
secret
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcDebugFlag
=
131
;
...
...
@@ -144,9 +141,7 @@ int main(int argc, char *argv[]) {
}
else
if
(
strcmp
(
argv
[
i
],
"-u"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
user
=
argv
[
++
i
];
}
else
if
(
strcmp
(
argv
[
i
],
"-k"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
secret
=
argv
[
++
i
];
}
else
if
(
strcmp
(
argv
[
i
],
"-spi"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
spi
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
rpcDebugFlag
=
atoi
(
argv
[
++
i
]);
}
else
{
...
...
@@ -160,8 +155,6 @@ int main(int argc, char *argv[]) {
printf
(
" [-n requests]: number of requests per thread, default is:%d
\n
"
,
numOfReqs
);
printf
(
" [-o compSize]: compression message size, default is:%d
\n
"
,
tsCompressMsgSize
);
printf
(
" [-u user]: user name for the connection, default is:%s
\n
"
,
rpcInit
.
user
);
printf
(
" [-k secret]: password for the connection, default is:%s
\n
"
,
rpcInit
.
secret
);
printf
(
" [-spi SPI]: security parameter index, default is:%d
\n
"
,
rpcInit
.
spi
);
printf
(
" [-d debugFlag]: debug flag, default:%d
\n
"
,
rpcDebugFlag
);
printf
(
" [-h help]: print out this help
\n\n
"
);
exit
(
0
);
...
...
source/libs/transport/test/rserver.c
浏览文件 @
f7cc44d6
...
...
@@ -123,7 +123,6 @@ int main(int argc, char *argv[]) {
rpcInit
.
cfp
=
processRequestMsg
;
rpcInit
.
sessions
=
1000
;
rpcInit
.
idleTime
=
2
*
1500
;
rpcInit
.
afp
=
retrieveAuthInfo
;
rpcDebugFlag
=
131
;
...
...
source/libs/transport/test/syncClient.c
浏览文件 @
f7cc44d6
...
...
@@ -21,15 +21,15 @@
#include "tutil.h"
typedef
struct
{
int
index
;
SEpSet
epSet
;
int
num
;
int
numOfReqs
;
int
msgSize
;
tsem_t
rspSem
;
tsem_t
*
pOverSem
;
int
index
;
SEpSet
epSet
;
int
num
;
int
numOfReqs
;
int
msgSize
;
tsem_t
rspSem
;
tsem_t
*
pOverSem
;
TdThread
thread
;
void
*
pRpc
;
void
*
pRpc
;
}
SInfo
;
static
void
processResponse
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
pMsg
->
info
.
ahandle
;
...
...
@@ -103,7 +103,7 @@ int main(int argc, char *argv[]) {
char
secret
[
20
]
=
"mypassword"
;
struct
timeval
systemTime
;
int64_t
startTime
,
endTime
;
TdThreadAttr
thattr
;
TdThreadAttr
thattr
;
// server info
epSet
.
inUse
=
0
;
...
...
@@ -119,9 +119,6 @@ int main(int argc, char *argv[]) {
rpcInit
.
sessions
=
100
;
rpcInit
.
idleTime
=
100
;
rpcInit
.
user
=
"michael"
;
rpcInit
.
secret
=
secret
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
...
...
@@ -144,9 +141,7 @@ int main(int argc, char *argv[]) {
}
else
if
(
strcmp
(
argv
[
i
],
"-u"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
user
=
argv
[
++
i
];
}
else
if
(
strcmp
(
argv
[
i
],
"-k"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
secret
=
argv
[
++
i
];
}
else
if
(
strcmp
(
argv
[
i
],
"-spi"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
spi
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
rpcDebugFlag
=
atoi
(
argv
[
++
i
]);
}
else
{
...
...
@@ -160,8 +155,6 @@ int main(int argc, char *argv[]) {
printf
(
" [-n requests]: number of requests per thread, default is:%d
\n
"
,
numOfReqs
);
printf
(
" [-o compSize]: compression message size, default is:%d
\n
"
,
tsCompressMsgSize
);
printf
(
" [-u user]: user name for the connection, default is:%s
\n
"
,
rpcInit
.
user
);
printf
(
" [-k secret]: password for the connection, default is:%s
\n
"
,
rpcInit
.
secret
);
printf
(
" [-spi SPI]: security parameter index, default is:%d
\n
"
,
rpcInit
.
spi
);
printf
(
" [-d debugFlag]: debug flag, default:%d
\n
"
,
rpcDebugFlag
);
printf
(
" [-h help]: print out this help
\n\n
"
);
exit
(
0
);
...
...
source/libs/transport/test/transUT.cpp
浏览文件 @
f7cc44d6
...
...
@@ -50,9 +50,6 @@ class Client {
rpcInit_
.
numOfThreads
=
nThread
;
rpcInit_
.
cfp
=
processResp
;
rpcInit_
.
user
=
(
char
*
)
user
;
rpcInit_
.
secret
=
(
char
*
)
secret
;
rpcInit_
.
ckey
=
(
char
*
)
ckey
;
rpcInit_
.
spi
=
1
;
rpcInit_
.
parent
=
this
;
rpcInit_
.
connType
=
TAOS_CONN_CLIENT
;
this
->
transCli
=
rpcOpen
(
&
rpcInit_
);
...
...
@@ -117,9 +114,6 @@ class Server {
rpcInit_
.
numOfThreads
=
5
;
rpcInit_
.
cfp
=
processReq
;
rpcInit_
.
user
=
(
char
*
)
user
;
rpcInit_
.
secret
=
(
char
*
)
secret
;
rpcInit_
.
ckey
=
(
char
*
)
ckey
;
rpcInit_
.
spi
=
1
;
rpcInit_
.
connType
=
TAOS_CONN_SERVER
;
}
void
Start
()
{
...
...
tests/system-test/7-tmq/subscribeStb.py
0 → 100644
浏览文件 @
f7cc44d6
此差异已折叠。
点击以展开。
tests/system-test/fulltest.sh
浏览文件 @
f7cc44d6
...
...
@@ -64,3 +64,4 @@ python3 ./test.py -f 2-query/nestedQuery.py
python3 ./test.py
-f
7-tmq/basic5.py
python3 ./test.py
-f
7-tmq/subscribeDb.py
python3 ./test.py
-f
7-tmq/subscribeDb1.py
python3 ./test.py
-f
7-tmq/subscribeStb.py
tests/test/c/tmqSim.c
浏览文件 @
f7cc44d6
...
...
@@ -98,16 +98,28 @@ static void printHelp() {
exit
(
EXIT_SUCCESS
);
}
char
*
getCurrentTimeString
(
char
*
timeString
)
{
time_t
tTime
=
taosGetTimestampSec
();
struct
tm
tm
=
*
taosLocalTime
(
&
tTime
,
NULL
);
sprintf
(
timeString
,
"%d-%02d-%02d %02d:%02d:%02d"
,
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
);
return
timeString
;
}
void
initLogFile
()
{
time_t
now
;
struct
tm
curTime
;
char
filename
[
256
];
now
=
taosTime
(
NULL
);
taosLocalTime
(
&
now
,
&
curTime
);
sprintf
(
filename
,
"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt"
,
configDir
,
curTime
.
tm_year
+
1900
,
curTime
.
tm_mon
+
1
,
curTime
.
tm_mday
,
curTime
.
tm_hour
,
curTime
.
tm_min
,
curTime
.
tm_sec
);
// sprintf(filename, "%s/../log/tmqlog.txt", configDir);
char
filename
[
256
];
char
tmpString
[
128
];
sprintf
(
filename
,
"%s/../log/tmqlog_%s.txt"
,
configDir
,
getCurrentTimeString
(
tmpString
));
//sprintf(filename, "%s/../log/tmqlog.txt", configDir);
TdFilePtr
pFile
=
taosOpenFile
(
filename
,
TD_FILE_TEXT
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
|
TD_FILE_STREAM
);
if
(
NULL
==
pFile
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
filename
);
...
...
@@ -117,9 +129,6 @@ void initLogFile() {
}
void
saveConfigToLogFile
()
{
time_t
tTime
=
taosGetTimestampSec
();
struct
tm
tm
=
*
taosLocalTime
(
&
tTime
,
NULL
);
taosFprintfFile
(
g_fp
,
"###################################################################
\n
"
);
taosFprintfFile
(
g_fp
,
"# configDir: %s
\n
"
,
configDir
);
taosFprintfFile
(
g_fp
,
"# dbName: %s
\n
"
,
g_stConfInfo
.
dbName
);
...
...
@@ -144,10 +153,11 @@ void saveConfigToLogFile() {
taosFprintfFile
(
g_fp
,
"%s:%s, "
,
g_stConfInfo
.
stThreads
[
i
].
key
[
k
],
g_stConfInfo
.
stThreads
[
i
].
value
[
k
]);
}
taosFprintfFile
(
g_fp
,
"
\n
"
);
taosFprintfFile
(
g_fp
,
" expect rows: %d
\n
"
,
g_stConfInfo
.
stThreads
[
i
].
expectMsgCnt
);
}
taosFprintfFile
(
g_fp
,
"# Test time: %d-%02d-%02d %02d:%02d:%02d
\n
"
,
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
);
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"# Test time: %s
\n
"
,
getCurrentTimeString
(
tmpString
)
);
taosFprintfFile
(
g_fp
,
"###################################################################
\n
"
);
}
...
...
@@ -316,10 +326,8 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
sprintf
(
sqlStr
,
"insert into %s.consumeresult values (now, %d, %"
PRId64
", %"
PRId64
", %d)"
,
g_stConfInfo
.
cdbName
,
pInfo
->
consumerId
,
pInfo
->
consumeMsgCnt
,
pInfo
->
consumeRowCnt
,
pInfo
->
checkresult
);
time_t
tTime
=
taosGetTimestampSec
();
struct
tm
tm
=
*
taosLocalTime
(
&
tTime
,
NULL
);
taosFprintfFile
(
g_fp
,
"# save result: %d-%02d-%02d %02d:%02d:%02d, sql: %s
\n
"
,
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
,
sqlStr
);
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s, consume id %d result: %s
\n
"
,
getCurrentTimeString
(
tmpString
),
pInfo
->
consumerId
,
sqlStr
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
...
...
@@ -339,6 +347,9 @@ void loop_consume(SThreadInfo* pInfo) {
int64_t
totalMsgs
=
0
;
int64_t
totalRows
=
0
;
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s consumer id %d start to loop pull msg
\n
"
,
getCurrentTimeString
(
tmpString
),
pInfo
->
consumerId
);
while
(
running
)
{
TAOS_RES
*
tmqMsg
=
tmq_consumer_poll
(
pInfo
->
tmq
,
g_stConfInfo
.
consumeDelay
*
1000
);
if
(
tmqMsg
)
{
...
...
@@ -351,11 +362,13 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs
++
;
if
(
totalRows
>=
pInfo
->
expectMsgCnt
)
{
taosFprintfFile
(
g_fp
,
"==== totalRows >= pInfo->expectMsgCnt, so break
\n
"
);
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s over than expect rows, so break consume
\n
"
,
getCurrentTimeString
(
tmpString
));
break
;
}
}
else
{
taosFprintfFile
(
g_fp
,
"==== delay over time, so break
\n
"
);
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s no poll more msg when time over, break consume
\n
"
,
getCurrentTimeString
(
tmpString
));
break
;
}
}
...
...
tools/shell/src/shellNettest.c
浏览文件 @
f7cc44d6
...
...
@@ -31,9 +31,6 @@ static void shellWorkAsClient() {
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
user
=
"_dnd"
;
rpcInit
.
ckey
=
"_key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
secret
=
pass
;
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
clientRpc
==
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录