Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
17ccfe4a
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
17ccfe4a
编写于
10月 14, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
10月 14, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #3826 from taosdata/feature/query
Feature/query
上级
d8a239bf
9864f6eb
变更
32
隐藏空白更改
内联
并排
Showing
32 changed file
with
385 addition
and
274 deletion
+385
-274
src/balance/src/balance.c
src/balance/src/balance.c
+1
-0
src/client/inc/tscLocalMerge.h
src/client/inc/tscLocalMerge.h
+0
-7
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+0
-3
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+9
-2
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+6
-22
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+1
-1
src/client/src/tscProfile.c
src/client/src/tscProfile.c
+1
-0
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+12
-6
src/client/src/tscServer.c
src/client/src/tscServer.c
+37
-17
src/client/src/tscSql.c
src/client/src/tscSql.c
+3
-5
src/client/src/tscStream.c
src/client/src/tscStream.c
+1
-1
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+102
-125
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+10
-15
src/connector/go
src/connector/go
+1
-1
src/inc/taosdef.h
src/inc/taosdef.h
+5
-3
src/inc/taosmsg.h
src/inc/taosmsg.h
+5
-0
src/mnode/inc/mnodeProfile.h
src/mnode/inc/mnodeProfile.h
+3
-1
src/mnode/src/mnodeCluster.c
src/mnode/src/mnodeCluster.c
+2
-1
src/mnode/src/mnodeDb.c
src/mnode/src/mnodeDb.c
+1
-1
src/mnode/src/mnodeDnode.c
src/mnode/src/mnodeDnode.c
+4
-2
src/mnode/src/mnodeMnode.c
src/mnode/src/mnodeMnode.c
+1
-1
src/mnode/src/mnodeProfile.c
src/mnode/src/mnodeProfile.c
+55
-24
src/mnode/src/mnodeShow.c
src/mnode/src/mnodeShow.c
+5
-3
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+47
-15
src/mnode/src/mnodeUser.c
src/mnode/src/mnodeUser.c
+1
-1
src/mnode/src/mnodeVgroup.c
src/mnode/src/mnodeVgroup.c
+2
-1
src/os/inc/osSemphone.h
src/os/inc/osSemphone.h
+2
-0
src/os/src/detail/osSemphone.c
src/os/src/detail/osSemphone.c
+26
-0
src/os/src/detail/osSysinfo.c
src/os/src/detail/osSysinfo.c
+2
-2
src/os/src/windows/wSemphone.c
src/os/src/windows/wSemphone.c
+18
-0
src/util/src/tcache.c
src/util/src/tcache.c
+21
-14
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+1
-0
未找到文件。
src/balance/src/balance.c
浏览文件 @
17ccfe4a
...
...
@@ -937,6 +937,7 @@ static int32_t balanceRetrieveScores(SShowObj *pShow, char *data, int32_t rows,
mnodeDecDnodeRef
(
pDnode
);
}
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
...
...
src/client/inc/tscLocalMerge.h
浏览文件 @
17ccfe4a
...
...
@@ -72,17 +72,10 @@ typedef struct SLocalReducer {
bool
orderPrjOnSTable
;
// projection query on stable
}
SLocalReducer
;
typedef
struct
SSubqueryState
{
int32_t
numOfRemain
;
// the number of remain unfinished subquery
int32_t
numOfTotal
;
// the number of total sub-queries
uint64_t
numOfRetrievedRows
;
// total number of points in this query
}
SSubqueryState
;
typedef
struct
SRetrieveSupport
{
tExtMemBuffer
**
pExtMemBuffer
;
// for build loser tree
tOrderDescriptor
*
pOrderDescriptor
;
SColumnModel
*
pFinalColModel
;
// colModel for final result
SSubqueryState
*
pState
;
int32_t
subqueryIndex
;
// index of current vnode in vnode list
SSqlObj
*
pParentSql
;
tFilePage
*
localBuffer
;
// temp buffer, there is a buffer for each vnode to
...
...
src/client/inc/tscUtil.h
浏览文件 @
17ccfe4a
...
...
@@ -66,7 +66,6 @@ typedef struct STidTags {
#pragma pack(pop)
typedef
struct
SJoinSupporter
{
SSubqueryState
*
pState
;
SSqlObj
*
pObj
;
// parent SqlObj
int32_t
subqueryIndex
;
// index of sub query
SInterval
interval
;
...
...
@@ -207,8 +206,6 @@ void tscTagCondRelease(STagCond* pCond);
void
tscGetSrcColumnInfo
(
SSrcColumnInfo
*
pColInfo
,
SQueryInfo
*
pQueryInfo
);
void
tscSetFreeHeatBeat
(
STscObj
*
pObj
);
bool
tscShouldFreeHeartBeat
(
SSqlObj
*
pHb
);
bool
tscShouldBeFreed
(
SSqlObj
*
pSql
);
STableMetaInfo
*
tscGetTableMetaInfoFromCmd
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
,
int32_t
tableIndex
);
...
...
src/client/inc/tsclient.h
浏览文件 @
17ccfe4a
...
...
@@ -334,6 +334,12 @@ typedef struct STscObj {
T_REF_DECLARE
()
}
STscObj
;
typedef
struct
SSubqueryState
{
int32_t
numOfRemain
;
// the number of remain unfinished subquery
int32_t
numOfSub
;
// the number of total sub-queries
uint64_t
numOfRetrievedRows
;
// total number of points in this query
}
SSubqueryState
;
typedef
struct
SSqlObj
{
void
*
signature
;
pthread_t
owner
;
// owner of sql object, by which it is executed
...
...
@@ -355,10 +361,11 @@ typedef struct SSqlObj {
tsem_t
rspSem
;
SSqlCmd
cmd
;
SSqlRes
res
;
uint16_t
numOfSubs
;
SSubqueryState
subState
;
struct
SSqlObj
**
pSubs
;
struct
SSqlObj
*
prev
,
*
next
;
struct
SSqlObj
*
prev
,
*
next
;
struct
SSqlObj
**
self
;
}
SSqlObj
;
...
...
src/client/src/tscAsync.c
浏览文件 @
17ccfe4a
...
...
@@ -361,15 +361,6 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
(
*
pSql
->
fetchFp
)(
pSql
->
param
,
pSql
,
pRes
->
tsrow
);
}
void
tscProcessAsyncRes
(
SSchedMsg
*
pMsg
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
pMsg
->
ahandle
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
assert
(
pSql
->
fp
!=
NULL
&&
pSql
->
fetchFp
!=
NULL
);
pSql
->
fp
=
pSql
->
fetchFp
;
(
*
pSql
->
fp
)(
pSql
->
param
,
pSql
,
pRes
->
code
);
}
// this function will be executed by queue task threads, so the terrno is not valid
static
void
tscProcessAsyncError
(
SSchedMsg
*
pMsg
)
{
void
(
*
fp
)()
=
pMsg
->
ahandle
;
...
...
@@ -393,22 +384,15 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
{
tscDebug
(
"%p SqlObj is freed, not add into queue async res"
,
pSql
);
return
;
}
else
{
tscError
(
"%p add into queued async res, code:%s"
,
pSql
,
tstrerror
(
pSql
->
res
.
code
));
}
SSchedMsg
schedMsg
=
{
0
};
schedMsg
.
fp
=
tscProcessAsyncRes
;
schedMsg
.
ahandle
=
pSql
;
schedMsg
.
thandle
=
(
void
*
)
1
;
schedMsg
.
msg
=
NULL
;
taosScheduleTask
(
tscQhandle
,
&
schedMsg
);
}
tscError
(
"%p add into queued async res, code:%s"
,
pSql
,
tstrerror
(
pSql
->
res
.
code
));
void
tscProcessAsyncFree
(
SSchedMsg
*
pMsg
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
pMsg
->
ahandle
;
tscDebug
(
"%p sql is freed"
,
pSql
);
taos_free_result
(
pSql
);
SSqlRes
*
pRes
=
&
pSql
->
res
;
assert
(
pSql
->
fp
!=
NULL
&&
pSql
->
fetchFp
!=
NULL
);
pSql
->
fp
=
pSql
->
fetchFp
;
(
*
pSql
->
fp
)(
pSql
->
param
,
pSql
,
pRes
->
code
);
}
int
tscSendMsgToServer
(
SSqlObj
*
pSql
);
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
17ccfe4a
...
...
@@ -639,7 +639,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
(
*
pMemBuffer
)
=
(
tExtMemBuffer
**
)
malloc
(
POINTER_BYTES
*
pSql
->
numOfSubs
);
(
*
pMemBuffer
)
=
(
tExtMemBuffer
**
)
malloc
(
POINTER_BYTES
*
pSql
->
subState
.
numOfSub
);
if
(
*
pMemBuffer
==
NULL
)
{
tscError
(
"%p failed to allocate memory"
,
pSql
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
src/client/src/tscProfile.c
浏览文件 @
17ccfe4a
...
...
@@ -242,6 +242,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
pQdesc
->
stime
=
htobe64
(
pSql
->
stime
);
pQdesc
->
queryId
=
htonl
(
pSql
->
queryId
);
pQdesc
->
useconds
=
htobe64
(
pSql
->
res
.
useconds
);
pQdesc
->
qHandle
=
htobe64
(
pSql
->
res
.
qhandle
);
pHeartbeat
->
numOfQueries
++
;
pQdesc
++
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
17ccfe4a
...
...
@@ -1605,8 +1605,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
setExprInfoForFunctions
(
SSqlCmd
*
pCmd
,
SQueryInfo
*
pQueryInfo
,
SSchema
*
pSchema
,
SConvertFunc
cvtFunc
,
char
*
aliasName
,
int32_t
resColIdx
,
SColumnIndex
*
pColIndex
)
{
static
int32_t
setExprInfoForFunctions
(
SSqlCmd
*
pCmd
,
SQueryInfo
*
pQueryInfo
,
SSchema
*
pSchema
,
SConvertFunc
cvtFunc
,
char
*
aliasName
,
int32_t
resColIdx
,
SColumnIndex
*
pColIndex
,
bool
finalResult
)
{
const
char
*
msg1
=
"not support column types"
;
int16_t
type
=
0
;
...
...
@@ -1652,8 +1652,13 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
SColumnIndex
index
=
{.
tableIndex
=
pColIndex
->
tableIndex
,
.
columnIndex
=
PRIMARYKEY_TIMESTAMP_COL_INDEX
};
tscColumnListInsert
(
pQueryInfo
->
colList
,
&
index
);
// if it is not in the final result, do not add it
SColumnList
ids
=
getColumnList
(
1
,
pColIndex
->
tableIndex
,
pColIndex
->
columnIndex
);
insertResultField
(
pQueryInfo
,
resColIdx
,
&
ids
,
bytes
,
(
int8_t
)
type
,
columnName
,
pExpr
);
if
(
finalResult
)
{
insertResultField
(
pQueryInfo
,
resColIdx
,
&
ids
,
bytes
,
(
int8_t
)
type
,
columnName
,
pExpr
);
}
else
{
tscColumnListInsert
(
pQueryInfo
->
colList
,
&
(
ids
.
ids
[
0
]));
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1928,7 +1933,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
for
(
int32_t
j
=
0
;
j
<
tscGetNumOfColumns
(
pTableMetaInfo
->
pTableMeta
);
++
j
)
{
index
.
columnIndex
=
j
;
if
(
setExprInfoForFunctions
(
pCmd
,
pQueryInfo
,
pSchema
,
cvtFunc
,
pItem
->
aliasName
,
colIndex
++
,
&
index
)
!=
0
)
{
if
(
setExprInfoForFunctions
(
pCmd
,
pQueryInfo
,
pSchema
,
cvtFunc
,
pItem
->
aliasName
,
colIndex
++
,
&
index
,
finalResult
)
!=
0
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
}
...
...
@@ -1945,7 +1950,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if
((
index
.
columnIndex
>=
tscGetNumOfColumns
(
pTableMetaInfo
->
pTableMeta
))
||
(
index
.
columnIndex
<
0
))
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg6
);
}
if
(
setExprInfoForFunctions
(
pCmd
,
pQueryInfo
,
pSchema
,
cvtFunc
,
pItem
->
aliasName
,
colIndex
+
i
,
&
index
)
!=
0
)
{
if
(
setExprInfoForFunctions
(
pCmd
,
pQueryInfo
,
pSchema
,
cvtFunc
,
pItem
->
aliasName
,
colIndex
+
i
,
&
index
,
finalResult
)
!=
0
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
...
...
@@ -1982,7 +1988,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
for
(
int32_t
i
=
0
;
i
<
tscGetNumOfColumns
(
pTableMetaInfo
->
pTableMeta
);
++
i
)
{
SColumnIndex
index
=
{.
tableIndex
=
j
,
.
columnIndex
=
i
};
if
(
setExprInfoForFunctions
(
pCmd
,
pQueryInfo
,
pSchema
,
cvtFunc
,
pItem
->
aliasName
,
colIndex
,
&
index
)
!=
0
)
{
if
(
setExprInfoForFunctions
(
pCmd
,
pQueryInfo
,
pSchema
,
cvtFunc
,
pItem
->
aliasName
,
colIndex
,
&
index
,
finalResult
)
!=
0
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
...
...
src/client/src/tscServer.c
浏览文件 @
17ccfe4a
...
...
@@ -24,7 +24,6 @@
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
#include "tutil.h"
#include "tlockfree.h"
SRpcCorEpSet
tscMgmtEpSet
;
...
...
@@ -198,15 +197,19 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
return
;
}
if
(
tscShouldFreeHeartBeat
(
pHB
))
{
tscDebug
(
"%p free HB object and release connection"
,
pHB
);
pObj
->
pHb
=
0
;
taos_free_result
(
pHB
);
}
else
{
int32_t
code
=
tscProcessSql
(
pHB
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"%p failed to sent HB to server, reason:%s"
,
pHB
,
tstrerror
(
code
));
}
void
**
p
=
taosCacheAcquireByKey
(
tscObjCache
,
&
pHB
,
sizeof
(
TSDB_CACHE_PTR_TYPE
));
if
(
p
==
NULL
)
{
tscWarn
(
"%p HB object has been released already"
,
pHB
);
return
;
}
assert
(
*
pHB
->
self
==
pHB
);
int32_t
code
=
tscProcessSql
(
pHB
);
taosCacheRelease
(
tscObjCache
,
(
void
**
)
&
p
,
false
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"%p failed to sent HB to server, reason:%s"
,
pHB
,
tstrerror
(
code
));
}
}
...
...
@@ -474,20 +477,29 @@ void tscKillSTableQuery(SSqlObj *pSql) {
pSql
->
res
.
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;
for
(
int
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
for
(
int
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
// NOTE: pSub may have been released already here
SSqlObj
*
pSub
=
pSql
->
pSubs
[
i
];
if
(
pSub
==
NULL
)
{
continue
;
}
pSub
->
res
.
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;
if
(
pSub
->
pRpcCtx
!=
NULL
)
{
rpcCancelRequest
(
pSub
->
pRpcCtx
);
pSub
->
pRpcCtx
=
NULL
;
void
**
p
=
taosCacheAcquireByKey
(
tscObjCache
,
&
pSub
,
sizeof
(
TSDB_CACHE_PTR_TYPE
));
if
(
p
==
NULL
)
{
continue
;
}
tscQueueAsyncRes
(
pSub
);
// async res? not other functions?
SSqlObj
*
pSubObj
=
(
SSqlObj
*
)
(
*
p
);
assert
(
pSubObj
->
self
==
(
SSqlObj
**
)
p
);
pSubObj
->
res
.
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;
if
(
pSubObj
->
pRpcCtx
!=
NULL
)
{
rpcCancelRequest
(
pSubObj
->
pRpcCtx
);
pSubObj
->
pRpcCtx
=
NULL
;
}
tscQueueAsyncRes
(
pSubObj
);
// async res? not other functions?
taosCacheRelease
(
tscObjCache
,
(
void
**
)
&
p
,
false
);
}
tscDebug
(
"%p super table query cancelled"
,
pSql
);
...
...
@@ -1451,7 +1463,7 @@ int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
int
tscProcessRetrieveLocalMergeRsp
(
SSqlObj
*
pSql
)
{
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
int32_t
code
=
pRes
->
code
;
if
(
pRes
->
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1494,6 +1506,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMConnectMsg
*
pConnect
=
(
SCMConnectMsg
*
)
pCmd
->
payload
;
// TODO refactor full_name
char
*
db
;
// ugly code to move the space
db
=
strstr
(
pObj
->
db
,
TS_PATH_DELIMITER
);
db
=
(
db
==
NULL
)
?
pObj
->
db
:
db
+
1
;
...
...
@@ -1501,6 +1514,9 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tstrncpy
(
pConnect
->
clientVersion
,
version
,
sizeof
(
pConnect
->
clientVersion
));
tstrncpy
(
pConnect
->
msgVersion
,
""
,
sizeof
(
pConnect
->
msgVersion
));
pConnect
->
pid
=
htonl
(
taosGetPId
());
taosGetCurrentAPPName
(
pConnect
->
appName
,
NULL
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1653,6 +1669,10 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMHeartBeatMsg
*
pHeartbeat
=
(
SCMHeartBeatMsg
*
)
pCmd
->
payload
;
pHeartbeat
->
numOfQueries
=
numOfQueries
;
pHeartbeat
->
numOfStreams
=
numOfStreams
;
pHeartbeat
->
pid
=
htonl
(
taosGetPId
());
taosGetCurrentAPPName
(
pHeartbeat
->
appName
,
NULL
);
int
msgLen
=
tscBuildQueryStreamDesc
(
pHeartbeat
,
pObj
);
pthread_mutex_unlock
(
&
pObj
->
mutex
);
...
...
src/client/src/tscSql.c
浏览文件 @
17ccfe4a
...
...
@@ -20,13 +20,13 @@
#include "tcache.h"
#include "tnote.h"
#include "trpc.h"
#include "ttimer.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "ttokendef.h"
#include "tutil.h"
#include "ttimer.h"
#include "tscProfile.h"
static
bool
validImpl
(
const
char
*
str
,
size_t
maxsize
)
{
...
...
@@ -261,9 +261,6 @@ void taos_close(TAOS *taos) {
return
;
}
pObj
->
signature
=
NULL
;
taosTmrStopA
(
&
(
pObj
->
pTimer
));
SSqlObj
*
pHb
=
pObj
->
pHb
;
if
(
pHb
!=
NULL
&&
atomic_val_compare_exchange_ptr
(
&
pObj
->
pHb
,
pHb
,
0
)
==
pHb
)
{
if
(
pHb
->
pRpcCtx
!=
NULL
)
{
// wait for rsp from dnode
...
...
@@ -463,6 +460,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
(
pRes
->
qhandle
==
0
||
pRes
->
code
==
TSDB_CODE_TSC_QUERY_CANCELLED
||
pCmd
->
command
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pCmd
->
command
==
TSDB_SQL_INSERT
)
{
return
NULL
;
...
...
@@ -526,7 +524,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
pRes->numOfClauseTotal = 0;
pRes->rspType = 0;
pSql->
numOfSubs
= 0;
pSql->
subState.numOfSub
= 0;
taosTFree(pSql->pSubs);
assert(pSql->fp == NULL);
...
...
src/client/src/tscStream.c
浏览文件 @
17ccfe4a
...
...
@@ -274,7 +274,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
(
pTableMetaInfo
->
pTableMeta
),
false
);
tscFreeSqlResult
(
pSql
);
taosTFree
(
pSql
->
pSubs
);
pSql
->
numOfSubs
=
0
;
pSql
->
subState
.
numOfSub
=
0
;
taosTFree
(
pTableMetaInfo
->
vgroupList
);
tscSetNextLaunchTimer
(
pStream
,
pSql
);
}
...
...
src/client/src/tscSubquery.c
浏览文件 @
17ccfe4a
...
...
@@ -26,7 +26,6 @@
#include "tscSubquery.h"
typedef
struct
SInsertSupporter
{
SSubqueryState
*
pState
;
SSqlObj
*
pSql
;
int32_t
index
;
}
SInsertSupporter
;
...
...
@@ -174,7 +173,6 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
}
pSupporter
->
pObj
=
pSql
;
pSupporter
->
pState
=
pState
;
pSupporter
->
subqueryIndex
=
index
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
...
...
@@ -250,7 +248,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
SJoinSupporter
*
pSupporter
=
NULL
;
//If the columns are not involved in the final select clause, the corresponding query will not be issued.
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
pSupporter
=
pSql
->
pSubs
[
i
]
->
param
;
if
(
taosArrayGetSize
(
pSupporter
->
exprList
)
>
0
)
{
++
numOfSub
;
...
...
@@ -260,16 +258,15 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
assert
(
numOfSub
>
0
);
// scan all subquery, if one sub query has only ts, ignore it
tscDebug
(
"%p start to launch secondary subqueries, total:%d, only:%d needs to query"
,
pSql
,
pSql
->
numOfSubs
,
numOfSub
);
tscDebug
(
"%p start to launch secondary subqueries, total:%d, only:%d needs to query"
,
pSql
,
pSql
->
subState
.
numOfSub
,
numOfSub
);
//the subqueries that do not actually launch the secondary query to virtual node is set as completed.
SSubqueryState
*
pState
=
pSupporter
->
pState
;
pState
->
numOfTotal
=
pSql
->
numOfSubs
;
pState
->
numOfRemain
=
numOfSub
;
SSubqueryState
*
pState
=
&
pSql
->
subState
;
pState
->
numOfRemain
=
pState
->
numOfSub
;
bool
success
=
true
;
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
SSqlObj
*
pPrevSub
=
pSql
->
pSubs
[
i
];
pSql
->
pSubs
[
i
]
=
NULL
;
...
...
@@ -322,7 +319,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
memset
(
&
pSupporter
->
fieldsInfo
,
0
,
sizeof
(
SFieldInfo
));
SQueryInfo
*
pNewQueryInfo
=
tscGetQueryInfoDetail
(
&
pNew
->
cmd
,
0
);
assert
(
pNew
->
numOfSubs
==
0
&&
pNew
->
cmd
.
numOfClause
==
1
&&
pNewQueryInfo
->
numOfTables
==
1
);
assert
(
pNew
->
subState
.
numOfSub
==
0
&&
pNew
->
cmd
.
numOfClause
==
1
&&
pNewQueryInfo
->
numOfTables
==
1
);
tscFieldInfoUpdateOffset
(
pNewQueryInfo
);
...
...
@@ -373,13 +370,13 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
if
(
!
success
)
{
pSql
->
res
.
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscError
(
"%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d"
,
pSql
,
pSql
->
numOfSubs
,
pSql
->
res
.
code
);
pSql
->
subState
.
numOfSub
,
pSql
->
res
.
code
);
freeJoinSubqueryObj
(
pSql
);
return
pSql
->
res
.
code
;
}
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
if
(
pSql
->
pSubs
[
i
]
==
NULL
)
{
continue
;
}
...
...
@@ -391,17 +388,13 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
}
void
freeJoinSubqueryObj
(
SSqlObj
*
pSql
)
{
SSubqueryState
*
pState
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
SSqlObj
*
pSub
=
pSql
->
pSubs
[
i
];
if
(
pSub
==
NULL
)
{
continue
;
}
SJoinSupporter
*
p
=
pSub
->
param
;
pState
=
p
->
pState
;
tscDestroyJoinSupporter
(
p
);
if
(
pSub
->
res
.
code
==
TSDB_CODE_SUCCESS
)
{
...
...
@@ -409,14 +402,13 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
}
}
taosTFree
(
pState
);
pSql
->
numOfSubs
=
0
;
pSql
->
subState
.
numOfSub
=
0
;
}
static
void
quitAllSubquery
(
SSqlObj
*
pSqlObj
,
SJoinSupporter
*
pSupporter
)
{
assert
(
pS
upporter
->
pState
->
numOfRemain
>
0
);
assert
(
pS
qlObj
->
subState
.
numOfRemain
>
0
);
if
(
atomic_sub_fetch_32
(
&
pS
upporter
->
pState
->
numOfRemain
,
1
)
<=
0
)
{
if
(
atomic_sub_fetch_32
(
&
pS
qlObj
->
subState
.
numOfRemain
,
1
)
<=
0
)
{
tscError
(
"%p all subquery return and query failed, global code:%d"
,
pSqlObj
,
pSqlObj
->
res
.
code
);
freeJoinSubqueryObj
(
pSqlObj
);
}
...
...
@@ -680,7 +672,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// no data exists in next vnode, mark the <tid, tags> query completed
// only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
if
(
atomic_sub_fetch_32
(
&
p
Supporter
->
pState
->
numOfRemain
,
1
)
>
0
)
{
if
(
atomic_sub_fetch_32
(
&
p
ParentSql
->
subState
.
numOfRemain
,
1
)
>
0
)
{
return
;
}
...
...
@@ -716,10 +708,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
STableMetaInfo
*
pTableMetaInfo2
=
tscGetMetaInfo
(
pQueryInfo2
,
0
);
tscBuildVgroupTableInfo
(
pParentSql
,
pTableMetaInfo2
,
s2
);
p
Supporter
->
pState
->
numOfTotal
=
2
;
p
Supporter
->
pState
->
numOfRemain
=
pSupporter
->
pState
->
numOfTotal
;
p
ParentSql
->
subState
.
numOfSub
=
2
;
p
ParentSql
->
subState
.
numOfRemain
=
pParentSql
->
subState
.
numOfSub
;
for
(
int32_t
m
=
0
;
m
<
pParentSql
->
numOfSubs
;
++
m
)
{
for
(
int32_t
m
=
0
;
m
<
pParentSql
->
subState
.
numOfSub
;
++
m
)
{
SSqlObj
*
sub
=
pParentSql
->
pSubs
[
m
];
issueTSCompQuery
(
sub
,
sub
->
param
,
pParentSql
);
}
...
...
@@ -818,7 +810,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
return
;
}
if
(
atomic_sub_fetch_32
(
&
p
Supporter
->
pState
->
numOfRemain
,
1
)
>
0
)
{
if
(
atomic_sub_fetch_32
(
&
p
ParentSql
->
subState
.
numOfRemain
,
1
)
>
0
)
{
return
;
}
...
...
@@ -850,7 +842,6 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
SJoinSupporter
*
pSupporter
=
(
SJoinSupporter
*
)
param
;
SSqlObj
*
pParentSql
=
pSupporter
->
pObj
;
SSubqueryState
*
pState
=
pSupporter
->
pState
;
SSqlObj
*
pSql
=
(
SSqlObj
*
)
tres
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
...
...
@@ -871,6 +862,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
pRes
->
numOfTotal
+=
pRes
->
numOfRows
;
}
SSubqueryState
*
pState
=
&
pParentSql
->
subState
;
if
(
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
)
&&
numOfRows
==
0
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
assert
(
pQueryInfo
->
numOfTables
==
1
);
...
...
@@ -878,7 +870,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
// for projection query, need to try next vnode if current vnode is exhausted
if
((
++
pTableMetaInfo
->
vgroupIndex
)
<
pTableMetaInfo
->
vgroupList
->
numOfVgroups
)
{
pState
->
numOfRemain
=
1
;
pState
->
numOf
Total
=
1
;
pState
->
numOf
Sub
=
1
;
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
pSql
->
fp
=
tscJoinQueryCallback
;
...
...
@@ -888,12 +880,12 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
}
}
if
(
atomic_sub_fetch_32
(
&
p
State
->
numOfRemain
,
1
)
>
0
)
{
tscDebug
(
"%p sub:%p completed, remain:%d, total:%d"
,
pParentSql
,
tres
,
p
State
->
numOfRemain
,
pState
->
numOfTotal
);
if
(
atomic_sub_fetch_32
(
&
p
ParentSql
->
subState
.
numOfRemain
,
1
)
>
0
)
{
tscDebug
(
"%p sub:%p completed, remain:%d, total:%d"
,
pParentSql
,
tres
,
p
ParentSql
->
subState
.
numOfRemain
,
pState
->
numOfSub
);
return
;
}
tscDebug
(
"%p all %d secondary subqueries retrieval completed, code:%d"
,
tres
,
pState
->
numOf
Total
,
pParentSql
->
res
.
code
);
tscDebug
(
"%p all %d secondary subqueries retrieval completed, code:%d"
,
tres
,
pState
->
numOf
Sub
,
pParentSql
->
res
.
code
);
if
(
pParentSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
freeJoinSubqueryObj
(
pParentSql
);
...
...
@@ -901,7 +893,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
}
// update the records for each subquery in parent sql object.
for
(
int32_t
i
=
0
;
i
<
p
ParentSql
->
numOfSubs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
p
State
->
numOfSub
;
++
i
)
{
if
(
pParentSql
->
pSubs
[
i
]
==
NULL
)
{
continue
;
}
...
...
@@ -917,32 +909,26 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
static
SJoinSupporter
*
tscUpdateSubqueryStatus
(
SSqlObj
*
pSql
,
int32_t
numOfFetch
)
{
int32_t
notInvolved
=
0
;
SJoinSupporter
*
pSupporter
=
NULL
;
SSubqueryState
*
pState
=
NULL
;
SSubqueryState
*
pState
=
&
pSql
->
subState
;
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
if
(
pSql
->
pSubs
[
i
]
==
NULL
)
{
notInvolved
++
;
}
else
{
pSupporter
=
(
SJoinSupporter
*
)
pSql
->
pSubs
[
i
]
->
param
;
pState
=
pSupporter
->
pState
;
}
}
assert
(
pState
!=
NULL
);
if
(
pState
!=
NULL
)
{
pState
->
numOfTotal
=
pSql
->
numOfSubs
;
pState
->
numOfRemain
=
numOfFetch
;
}
pState
->
numOfRemain
=
numOfFetch
;
return
pSupporter
;
}
void
tscFetchDatablockFromSubquery
(
SSqlObj
*
pSql
)
{
assert
(
pSql
->
numOfSubs
>=
1
);
assert
(
pSql
->
subState
.
numOfSub
>=
1
);
int32_t
numOfFetch
=
0
;
bool
hasData
=
true
;
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
// if the subquery is NULL, it does not involved in the final result generation
SSqlObj
*
pSub
=
pSql
->
pSubs
[
i
];
if
(
pSub
==
NULL
)
{
...
...
@@ -989,7 +975,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
tscDebug
(
"%p retrieve data from %d subqueries"
,
pSql
,
numOfFetch
);
SJoinSupporter
*
pSupporter
=
tscUpdateSubqueryStatus
(
pSql
,
numOfFetch
);
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
SSqlObj
*
pSql1
=
pSql
->
pSubs
[
i
];
if
(
pSql1
==
NULL
)
{
continue
;
...
...
@@ -1124,7 +1110,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
}
// wait for the other subqueries response from vnode
if
(
atomic_sub_fetch_32
(
&
p
Supporter
->
pState
->
numOfRemain
,
1
)
>
0
)
{
if
(
atomic_sub_fetch_32
(
&
p
ParentSql
->
subState
.
numOfRemain
,
1
)
>
0
)
{
return
;
}
...
...
@@ -1136,7 +1122,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
* data instead of returning to its invoker
*/
if
(
pTableMetaInfo
->
vgroupIndex
>
0
&&
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
p
Supporter
->
pState
->
numOfRemain
=
pSupporter
->
pState
->
numOfTotal
;
// reset the record value
p
ParentSql
->
subState
.
numOfRemain
=
pParentSql
->
subState
.
numOfSub
;
// reset the record value
pSql
->
fp
=
joinRetrieveFinalResCallback
;
// continue retrieve data
pSql
->
cmd
.
command
=
TSDB_SQL_FETCH
;
...
...
@@ -1165,7 +1151,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
assert
(
pSql
->
res
.
numOfRows
==
0
);
if
(
pSql
->
pSubs
==
NULL
)
{
pSql
->
pSubs
=
calloc
(
pS
upporter
->
pState
->
numOfTotal
,
POINTER_BYTES
);
pSql
->
pSubs
=
calloc
(
pS
ql
->
subState
.
numOfSub
,
POINTER_BYTES
);
if
(
pSql
->
pSubs
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
...
...
@@ -1176,8 +1162,8 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
pSql
->
pSubs
[
pSql
->
numOfSubs
++
]
=
pNew
;
assert
(
pSql
->
numOfSubs
<=
pSupporter
->
pState
->
numOfTotal
);
pSql
->
pSubs
[
pSql
->
subState
.
numOfRemain
++
]
=
pNew
;
assert
(
pSql
->
subState
.
numOfRemain
<=
pSql
->
subState
.
numOfSub
);
if
(
QUERY_IS_JOIN_QUERY
(
pQueryInfo
->
type
))
{
addGroupInfoForSubquery
(
pSql
,
pNew
,
0
,
tableIndex
);
...
...
@@ -1221,7 +1207,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pNewQueryInfo
,
0
);
if
(
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
// return the tableId & tag
SColumnIndex
i
ndex
=
{
0
};
SColumnIndex
colI
ndex
=
{
0
};
STagCond
*
pTagCond
=
&
pSupporter
->
tagCond
;
assert
(
pTagCond
->
joinInfo
.
hasJoin
);
...
...
@@ -1234,7 +1220,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
SSchema
*
pSchema
=
tscGetTableTagSchema
(
pTableMetaInfo
->
pTableMeta
);
for
(
int32_t
i
=
0
;
i
<
numOfTags
;
++
i
)
{
if
(
pSchema
[
i
].
colId
==
tagColId
)
{
i
ndex
.
columnIndex
=
i
;
colI
ndex
.
columnIndex
=
i
;
break
;
}
}
...
...
@@ -1251,18 +1237,18 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
// set get tags query type
TSDB_QUERY_SET_TYPE
(
pNewQueryInfo
->
type
,
TSDB_QUERY_TYPE_TAG_FILTER_QUERY
);
tscAddSpecialColumnForSelect
(
pNewQueryInfo
,
0
,
TSDB_FUNC_TID_TAG
,
&
i
ndex
,
&
s1
,
TSDB_COL_TAG
);
tscAddSpecialColumnForSelect
(
pNewQueryInfo
,
0
,
TSDB_FUNC_TID_TAG
,
&
colI
ndex
,
&
s1
,
TSDB_COL_TAG
);
size_t
numOfCols
=
taosArrayGetSize
(
pNewQueryInfo
->
colList
);
tscDebug
(
"%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
"exprInfo:%"
PRIzu
", colList:%"
PRIzu
", fieldsInfo:%d, tagIndex:%d, name:%s"
,
pSql
,
pNew
,
tableIndex
,
pTableMetaInfo
->
vgroupIndex
,
pNewQueryInfo
->
type
,
tscSqlExprNumOfExprs
(
pNewQueryInfo
),
numOfCols
,
pNewQueryInfo
->
fieldsInfo
.
numOfOutput
,
i
ndex
.
columnIndex
,
pNewQueryInfo
->
pTableMetaInfo
[
0
]
->
name
);
numOfCols
,
pNewQueryInfo
->
fieldsInfo
.
numOfOutput
,
colI
ndex
.
columnIndex
,
pNewQueryInfo
->
pTableMetaInfo
[
0
]
->
name
);
}
else
{
SSchema
colSchema
=
{.
type
=
TSDB_DATA_TYPE_BINARY
,
.
bytes
=
1
};
SColumnIndex
i
ndex
=
{
0
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
};
tscAddSpecialColumnForSelect
(
pNewQueryInfo
,
0
,
TSDB_FUNC_TS_COMP
,
&
i
ndex
,
&
colSchema
,
TSDB_COL_NORMAL
);
SColumnIndex
colI
ndex
=
{
0
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
};
tscAddSpecialColumnForSelect
(
pNewQueryInfo
,
0
,
TSDB_FUNC_TS_COMP
,
&
colI
ndex
,
&
colSchema
,
TSDB_COL_NORMAL
);
// set the tags value for ts_comp function
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pNewQueryInfo
,
0
);
...
...
@@ -1320,8 +1306,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
goto
_error
;
}
pState
->
numOfTotal
=
pQueryInfo
->
numOfTables
;
pState
->
numOfRemain
=
pState
->
numOfTotal
;
pSql
->
subState
.
numOfSub
=
pQueryInfo
->
numOfTables
;
bool
hasEmptySub
=
false
;
...
...
@@ -1354,10 +1339,10 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
pSql
->
cmd
.
command
=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
;
(
*
pSql
->
fp
)(
pSql
->
param
,
pSql
,
0
);
}
else
{
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
SSqlObj
*
pSub
=
pSql
->
pSubs
[
i
];
if
((
code
=
tscProcessSql
(
pSub
))
!=
TSDB_CODE_SUCCESS
)
{
pS
tate
->
numOfRemain
=
i
-
1
;
// the already sent reques
will continue and do not go to the error process routine
pS
ql
->
subState
.
numOfRemain
=
i
-
1
;
// the already sent request
will continue and do not go to the error process routine
break
;
}
}
...
...
@@ -1373,7 +1358,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
}
static
void
doCleanupSubqueries
(
SSqlObj
*
pSql
,
int32_t
numOfSubs
,
SSubqueryState
*
pState
)
{
assert
(
numOfSubs
<=
pSql
->
numOfSubs
&&
numOfSubs
>=
0
&&
pState
!=
NULL
);
assert
(
numOfSubs
<=
pSql
->
subState
.
numOfSub
&&
numOfSubs
>=
0
&&
pState
!=
NULL
);
for
(
int32_t
i
=
0
;
i
<
numOfSubs
;
++
i
)
{
SSqlObj
*
pSub
=
pSql
->
pSubs
[
i
];
...
...
@@ -1411,8 +1396,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
pSql
->
numOfSubs
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
assert
(
pSql
->
numOfSubs
>
0
);
pSql
->
subState
.
numOfSub
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
assert
(
pSql
->
subState
.
numOfSub
>
0
);
int32_t
ret
=
tscLocalReducerEnvCreate
(
pSql
,
&
pMemoryBuf
,
&
pDesc
,
&
pModel
,
nBufferSize
);
if
(
ret
!=
0
)
{
...
...
@@ -1422,28 +1407,26 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return
ret
;
}
pSql
->
pSubs
=
calloc
(
pSql
->
numOfSubs
,
POINTER_BYTES
);
pSql
->
pSubs
=
calloc
(
pSql
->
subState
.
numOfSub
,
POINTER_BYTES
);
tscDebug
(
"%p retrieved query data from %d vnode(s)"
,
pSql
,
pSql
->
numOfSubs
);
tscDebug
(
"%p retrieved query data from %d vnode(s)"
,
pSql
,
pSql
->
subState
.
numOfSub
);
SSubqueryState
*
pState
=
calloc
(
1
,
sizeof
(
SSubqueryState
));
if
(
pSql
->
pSubs
==
NULL
||
pState
==
NULL
)
{
taosTFree
(
pState
);
taosTFree
(
pSql
->
pSubs
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pSql
->
numOfSubs
);
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pSql
->
subState
.
numOfSub
);
tscQueueAsyncRes
(
pSql
);
return
ret
;
}
pState
->
numOfTotal
=
pSql
->
numOfSubs
;
pState
->
numOfRemain
=
pSql
->
numOfSubs
;
pSql
->
subState
.
numOfRemain
=
pSql
->
subState
.
numOfSub
;
pRes
->
code
=
TSDB_CODE_SUCCESS
;
int32_t
i
=
0
;
for
(;
i
<
pSql
->
numOfSubs
;
++
i
)
{
for
(;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
SRetrieveSupport
*
trs
=
(
SRetrieveSupport
*
)
calloc
(
1
,
sizeof
(
SRetrieveSupport
));
if
(
trs
==
NULL
)
{
tscError
(
"%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s"
,
pSql
,
i
,
strerror
(
errno
));
...
...
@@ -1452,8 +1435,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs
->
pExtMemBuffer
=
pMemoryBuf
;
trs
->
pOrderDescriptor
=
pDesc
;
trs
->
pState
=
pState
;
trs
->
localBuffer
=
(
tFilePage
*
)
calloc
(
1
,
nBufferSize
+
sizeof
(
tFilePage
));
if
(
trs
->
localBuffer
==
NULL
)
{
tscError
(
"%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s"
,
pSql
,
i
,
strerror
(
errno
));
...
...
@@ -1461,8 +1443,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
break
;
}
trs
->
subqueryIndex
=
i
;
trs
->
pParentSql
=
pSql
;
trs
->
subqueryIndex
=
i
;
trs
->
pParentSql
=
pSql
;
trs
->
pFinalColModel
=
pModel
;
SSqlObj
*
pNew
=
tscCreateSTableSubquery
(
pSql
,
trs
,
NULL
);
...
...
@@ -1483,42 +1465,39 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscDebug
(
"%p sub:%p create subquery success. orderOfSub:%d"
,
pSql
,
pNew
,
trs
->
subqueryIndex
);
}
if
(
i
<
pSql
->
numOfSubs
)
{
if
(
i
<
pSql
->
subState
.
numOfSub
)
{
tscError
(
"%p failed to prepare subquery structure and launch subqueries"
,
pSql
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pSql
->
numOfSubs
);
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pSql
->
subState
.
numOfSub
);
doCleanupSubqueries
(
pSql
,
i
,
pState
);
return
pRes
->
code
;
// free all allocated resource
}
if
(
pRes
->
code
==
TSDB_CODE_TSC_QUERY_CANCELLED
)
{
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pSql
->
numOfSubs
);
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pSql
->
subState
.
numOfSub
);
doCleanupSubqueries
(
pSql
,
i
,
pState
);
return
pRes
->
code
;
}
for
(
int32_t
j
=
0
;
j
<
pSql
->
numOfSubs
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pSql
->
subState
.
numOfSub
;
++
j
)
{
SSqlObj
*
pSub
=
pSql
->
pSubs
[
j
];
SRetrieveSupport
*
pSupport
=
pSub
->
param
;
tscDebug
(
"%p sub:%p launch subquery, orderOfSub:%d."
,
pSql
,
pSub
,
pSupport
->
subqueryIndex
);
tscProcessSql
(
pSub
);
}
return
TSDB_CODE_SUCCESS
;
}
static
void
tscFreeSubSqlObj
(
SRetrieveSupport
*
trsupport
,
SSqlObj
*
pSql
)
{
tscDebug
(
"%p start to free subquery obj"
,
pSql
);
int32_t
index
=
trsupport
->
subqueryIndex
;
SSqlObj
*
pParentSql
=
trsupport
->
pParentSql
;
//
int32_t index = trsupport->subqueryIndex;
//
SSqlObj *pParentSql = trsupport->pParentSql;
assert
(
pSql
==
pParentSql
->
pSubs
[
index
]);
// pParentSql->pSubs[index] = NULL;
//
// taos_free_result(pSql);
// assert(pSql == pParentSql->pSubs[index]);
taosTFree
(
trsupport
->
localBuffer
);
taosTFree
(
trsupport
);
}
...
...
@@ -1581,9 +1560,14 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
int32_t
subqueryIndex
=
trsupport
->
subqueryIndex
;
assert
(
pSql
!=
NULL
);
SSubqueryState
*
pState
=
trsupport
->
pState
;
assert
(
pState
->
numOfRemain
<=
pState
->
numOfTotal
&&
pState
->
numOfRemain
>=
0
&&
pParentSql
->
numOfSubs
==
pState
->
numOfTotal
);
SSubqueryState
*
pState
=
&
pParentSql
->
subState
;
int32_t
remain
=
pState
->
numOfRemain
;
int32_t
sub
=
pState
->
numOfSub
;
UNUSED
(
remain
);
UNUSED
(
sub
);
assert
(
pParentSql
->
subState
.
numOfRemain
<=
pState
->
numOfSub
&&
pParentSql
->
subState
.
numOfRemain
>=
0
);
// retrieved in subquery failed. OR query cancelled in retrieve phase.
if
(
taos_errno
(
pSql
)
==
TSDB_CODE_SUCCESS
&&
pParentSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1613,24 +1597,23 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
}
}
int32_t
remain
=
-
1
;
if
((
remain
=
atomic_sub_fetch_32
(
&
p
State
->
numOfRemain
,
1
))
>
0
)
{
remain
=
-
1
;
if
((
remain
=
atomic_sub_fetch_32
(
&
p
ParentSql
->
subState
.
numOfRemain
,
1
))
>
0
)
{
tscDebug
(
"%p sub:%p orderOfSub:%d freed, finished subqueries:%d"
,
pParentSql
,
pSql
,
trsupport
->
subqueryIndex
,
pState
->
numOf
Total
-
remain
);
pState
->
numOf
Sub
-
remain
);
tscFreeSubSqlObj
(
trsupport
,
pSql
);
return
;
}
// all subqueries are failed
tscError
(
"%p retrieve from %d vnode(s) completed,code:%s.FAILED."
,
pParentSql
,
pState
->
numOf
Total
,
tscError
(
"%p retrieve from %d vnode(s) completed,code:%s.FAILED."
,
pParentSql
,
pState
->
numOf
Sub
,
tstrerror
(
pParentSql
->
res
.
code
));
// release allocated resource
tscLocalReducerEnvDestroy
(
trsupport
->
pExtMemBuffer
,
trsupport
->
pOrderDescriptor
,
trsupport
->
pFinalColModel
,
pState
->
numOf
Total
);
pState
->
numOf
Sub
);
taosTFree
(
trsupport
->
pState
);
tscFreeSubSqlObj
(
trsupport
,
pSql
);
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
...
...
@@ -1650,7 +1633,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
SSqlObj
*
pParentSql
=
trsupport
->
pParentSql
;
tOrderDescriptor
*
pDesc
=
trsupport
->
pOrderDescriptor
;
SSubqueryState
*
pState
=
trsupport
->
p
State
;
SSubqueryState
*
pState
=
&
pParentSql
->
sub
State
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
pQueryInfo
->
pTableMetaInfo
[
0
];
...
...
@@ -1687,9 +1670,9 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
}
int32_t
remain
=
-
1
;
if
((
remain
=
atomic_sub_fetch_32
(
&
p
State
->
numOfRemain
,
1
))
>
0
)
{
if
((
remain
=
atomic_sub_fetch_32
(
&
p
ParentSql
->
subState
.
numOfRemain
,
1
))
>
0
)
{
tscDebug
(
"%p sub:%p orderOfSub:%d freed, finished subqueries:%d"
,
pParentSql
,
pSql
,
trsupport
->
subqueryIndex
,
pState
->
numOf
Total
-
remain
);
pState
->
numOf
Sub
-
remain
);
tscFreeSubSqlObj
(
trsupport
,
pSql
);
return
;
...
...
@@ -1699,20 +1682,18 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
pDesc
->
pColumnModel
->
capacity
=
trsupport
->
pExtMemBuffer
[
idx
]
->
numOfElemsPerPage
;
tscDebug
(
"%p retrieve from %d vnodes completed.final NumOfRows:%"
PRId64
",start to build loser tree"
,
pParentSql
,
pState
->
numOf
Total
,
pState
->
numOfRetrievedRows
);
pState
->
numOf
Sub
,
pState
->
numOfRetrievedRows
);
SQueryInfo
*
pPQueryInfo
=
tscGetQueryInfoDetail
(
&
pParentSql
->
cmd
,
0
);
tscClearInterpInfo
(
pPQueryInfo
);
tscCreateLocalReducer
(
trsupport
->
pExtMemBuffer
,
pState
->
numOf
Total
,
pDesc
,
trsupport
->
pFinalColModel
,
pParentSql
);
tscCreateLocalReducer
(
trsupport
->
pExtMemBuffer
,
pState
->
numOf
Sub
,
pDesc
,
trsupport
->
pFinalColModel
,
pParentSql
);
tscDebug
(
"%p build loser tree completed"
,
pParentSql
);
pParentSql
->
res
.
precision
=
pSql
->
res
.
precision
;
pParentSql
->
res
.
numOfRows
=
0
;
pParentSql
->
res
.
row
=
0
;
// only free once
taosTFree
(
trsupport
->
pState
);
tscFreeSubSqlObj
(
trsupport
,
pSql
);
// set the command flag must be after the semaphore been correctly set.
...
...
@@ -1733,8 +1714,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
assert
(
tres
!=
NULL
);
SSqlObj
*
pSql
=
(
SSqlObj
*
)
tres
;
SSubqueryState
*
pState
=
trsupport
->
p
State
;
assert
(
pState
->
numOfRemain
<=
pState
->
numOf
Total
&&
pState
->
numOfRemain
>=
0
&&
pParentSql
->
numOfSubs
==
pState
->
numOfTotal
);
SSubqueryState
*
pState
=
&
pParentSql
->
sub
State
;
assert
(
pState
->
numOfRemain
<=
pState
->
numOf
Sub
&&
pState
->
numOfRemain
>=
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
0
,
0
);
SCMVgroupInfo
*
pVgroup
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
0
];
...
...
@@ -1751,6 +1732,10 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
if
(
taos_errno
(
pSql
)
!=
TSDB_CODE_SUCCESS
)
{
assert
(
numOfRows
==
taos_errno
(
pSql
));
if
(
numOfRows
==
TSDB_CODE_TSC_QUERY_CANCELLED
)
{
trsupport
->
numOfRetry
=
MAX_NUM_OF_SUBQUERY_RETRY
;
}
if
(
trsupport
->
numOfRetry
++
<
MAX_NUM_OF_SUBQUERY_RETRY
)
{
tscError
(
"%p sub:%p failed code:%s, retry:%d"
,
pParentSql
,
pSql
,
tstrerror
(
numOfRows
),
trsupport
->
numOfRetry
);
...
...
@@ -1822,7 +1807,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pNew
->
cmd
,
0
);
pQueryInfo
->
type
|=
TSDB_QUERY_TYPE_STABLE_SUBQUERY
;
assert
(
pQueryInfo
->
numOfTables
==
1
&&
pNew
->
cmd
.
numOfClause
==
1
&&
trsupport
->
subqueryIndex
<
pSql
->
numOfSubs
);
assert
(
pQueryInfo
->
numOfTables
==
1
&&
pNew
->
cmd
.
numOfClause
==
1
&&
trsupport
->
subqueryIndex
<
pSql
->
subState
.
numOfSub
);
// launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
table_index
);
...
...
@@ -1893,7 +1878,6 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
static
void
multiVnodeInsertFinalize
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
)
{
SInsertSupporter
*
pSupporter
=
(
SInsertSupporter
*
)
param
;
SSqlObj
*
pParentObj
=
pSupporter
->
pSql
;
SSubqueryState
*
pState
=
pSupporter
->
pState
;
// record the total inserted rows
if
(
numOfRows
>
0
)
{
...
...
@@ -1908,15 +1892,13 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
}
taosTFree
(
pSupporter
);
if
(
atomic_sub_fetch_32
(
&
pState
->
numOfRemain
,
1
)
>
0
)
{
if
(
atomic_sub_fetch_32
(
&
pParentObj
->
subState
.
numOfRemain
,
1
)
>
0
)
{
return
;
}
tscDebug
(
"%p Async insertion completed, total inserted:%"
PRId64
,
pParentObj
,
pParentObj
->
res
.
numOfRows
);
// release data block data
taosTFree
(
pState
);
// restore user defined fp
pParentObj
->
fp
=
pParentObj
->
fetchFp
;
...
...
@@ -1937,7 +1919,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
SInsertSupporter
*
pSupporter
=
(
SInsertSupporter
*
)
pSql
->
param
;
assert
(
pSupporter
->
index
<
pSupporter
->
pS
tate
->
numOfTotal
);
assert
(
pSupporter
->
index
<
pSupporter
->
pS
ql
->
subState
.
numOfSub
);
STableDataBlocks
*
pTableDataBlock
=
taosArrayGetP
(
pCmd
->
pDataBlocks
,
pSupporter
->
index
);
int32_t
code
=
tscCopyDataBlockToPayload
(
pSql
,
pTableDataBlock
);
...
...
@@ -1954,33 +1936,29 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
pSql
->
numOfSubs
=
(
uint16_t
)
taosArrayGetSize
(
pCmd
->
pDataBlocks
);
assert
(
pSql
->
numOfSubs
>
0
);
pSql
->
subState
.
numOfSub
=
(
uint16_t
)
taosArrayGetSize
(
pCmd
->
pDataBlocks
);
assert
(
pSql
->
subState
.
numOfSub
>
0
);
pRes
->
code
=
TSDB_CODE_SUCCESS
;
// the number of already initialized subqueries
int32_t
numOfSub
=
0
;
SSubqueryState
*
pState
=
calloc
(
1
,
sizeof
(
SSubqueryState
));
pState
->
numOfTotal
=
pSql
->
numOfSubs
;
pState
->
numOfRemain
=
pSql
->
numOfSubs
;
pSql
->
pSubs
=
calloc
(
pSql
->
numOfSubs
,
POINTER_BYTES
);
pSql
->
subState
.
numOfRemain
=
pSql
->
subState
.
numOfSub
;
pSql
->
pSubs
=
calloc
(
pSql
->
subState
.
numOfSub
,
POINTER_BYTES
);
if
(
pSql
->
pSubs
==
NULL
)
{
goto
_error
;
}
tscDebug
(
"%p submit data to %d vnode(s)"
,
pSql
,
pSql
->
numOfSubs
);
tscDebug
(
"%p submit data to %d vnode(s)"
,
pSql
,
pSql
->
subState
.
numOfSub
);
while
(
numOfSub
<
pSql
->
numOfSubs
)
{
while
(
numOfSub
<
pSql
->
subState
.
numOfSub
)
{
SInsertSupporter
*
pSupporter
=
calloc
(
1
,
sizeof
(
SInsertSupporter
));
if
(
pSupporter
==
NULL
)
{
goto
_error
;
}
pSupporter
->
pSql
=
pSql
;
pSupporter
->
pState
=
pState
;
pSupporter
->
index
=
numOfSub
;
SSqlObj
*
pNew
=
createSimpleSubObj
(
pSql
,
multiVnodeInsertFinalize
,
pSupporter
,
TSDB_SQL_INSERT
);
...
...
@@ -2003,12 +1981,12 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
numOfSub
++
;
}
else
{
tscDebug
(
"%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s"
,
pSql
,
numOfSub
,
pSql
->
numOfSubs
,
tstrerror
(
pRes
->
code
));
pSql
->
subState
.
numOfSub
,
tstrerror
(
pRes
->
code
));
goto
_error
;
}
}
if
(
numOfSub
<
pSql
->
numOfSubs
)
{
if
(
numOfSub
<
pSql
->
subState
.
numOfSub
)
{
tscError
(
"%p failed to prepare subObj structure and launch sub-insertion"
,
pSql
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
...
...
@@ -2026,7 +2004,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
return
TSDB_CODE_SUCCESS
;
_error:
taosTFree
(
pState
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
...
...
@@ -2048,7 +2025,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
int32_t
numOfRes
=
INT32_MAX
;
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
if
(
pSql
->
pSubs
[
i
]
==
NULL
)
{
continue
;
}
...
...
@@ -2238,7 +2215,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
if
(
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
bool
allSubqueryExhausted
=
true
;
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
if
(
pSql
->
pSubs
[
i
]
==
NULL
)
{
continue
;
}
...
...
@@ -2264,7 +2241,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
hasData
=
!
allSubqueryExhausted
;
}
else
{
// otherwise, in case inner join, if any subquery exhausted, query completed.
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
if
(
pSql
->
pSubs
[
i
]
==
0
)
{
continue
;
}
...
...
src/client/src/tscUtil.c
浏览文件 @
17ccfe4a
...
...
@@ -360,26 +360,26 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
tscFreeSqlResult
(
pSql
);
taosTFree
(
pSql
->
pSubs
);
pSql
->
numOfSubs
=
0
;
pSql
->
subState
.
numOfSub
=
0
;
pSql
->
self
=
0
;
tscResetSqlCmdObj
(
pCmd
,
false
);
}
static
UNUSED_FUNC
void
tscFreeSubobj
(
SSqlObj
*
pSql
)
{
if
(
pSql
->
numOfSubs
==
0
)
{
static
void
tscFreeSubobj
(
SSqlObj
*
pSql
)
{
if
(
pSql
->
subState
.
numOfSub
==
0
)
{
return
;
}
tscDebug
(
"%p start to free sub SqlObj, numOfSub:%d"
,
pSql
,
pSql
->
numOfSubs
);
tscDebug
(
"%p start to free sub SqlObj, numOfSub:%d"
,
pSql
,
pSql
->
subState
.
numOfSub
);
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
tscDebug
(
"%p free sub SqlObj:%p, index:%d"
,
pSql
,
pSql
->
pSubs
[
i
],
i
);
taos_free_result
(
pSql
->
pSubs
[
i
]);
pSql
->
pSubs
[
i
]
=
NULL
;
}
pSql
->
numOfSubs
=
0
;
pSql
->
subState
.
numOfSub
=
0
;
}
/**
...
...
@@ -415,7 +415,9 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscDebug
(
"%p start to free sqlObj"
,
pSql
);
pSql
->
res
.
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;
tscFreeSubobj
(
pSql
);
tscPartiallyFreeSqlObj
(
pSql
);
pSql
->
signature
=
NULL
;
...
...
@@ -1516,13 +1518,6 @@ void tscSetFreeHeatBeat(STscObj* pObj) {
pQueryInfo
->
type
=
TSDB_QUERY_TYPE_FREE_RESOURCE
;
}
bool
tscShouldFreeHeartBeat
(
SSqlObj
*
pHb
)
{
assert
(
pHb
==
pHb
->
signature
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pHb
->
cmd
,
0
);
return
pQueryInfo
->
type
==
TSDB_QUERY_TYPE_FREE_RESOURCE
;
}
/*
* the following four kinds of SqlObj should not be freed
* 1. SqlObj for stream computing
...
...
@@ -2291,7 +2286,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
*
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/
pSql
->
numOfSubs
=
0
;
pSql
->
subState
.
numOfSub
=
0
;
pCmd
->
command
=
TSDB_SQL_SELECT
;
tscResetForNextRetrieve
(
pRes
);
...
...
@@ -2323,7 +2318,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
pRes
->
numOfTotal
=
num
;
taosTFree
(
pSql
->
pSubs
);
pSql
->
numOfSubs
=
0
;
pSql
->
subState
.
numOfSub
=
0
;
pSql
->
fp
=
fp
;
tscDebug
(
"%p try data in the next subclause:%d, total subclause:%d"
,
pSql
,
pCmd
->
clauseIndex
,
pCmd
->
numOfClause
);
...
...
go
@
8c58c512
比较
8d7bf743
...
8c58c512
Subproject commit 8
d7bf743852897110cbdcc7c4322cd7a74d4167b
Subproject commit 8
c58c512b6acda8bcdfa48fdc7140227b5221766
src/inc/taosdef.h
浏览文件 @
17ccfe4a
...
...
@@ -253,8 +253,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_SQL_SHOW_LEN 256
#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024U) // sql length should be less than 8mb
#define TSDB_MAX_SQL_SHOW_LEN 512
#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 8mb
#define TSDB_APPNAME_LEN TSDB_UNI_LEN
#define TSDB_MAX_BYTES_PER_ROW 16384
#define TSDB_MAX_TAGS_LEN 16384
...
...
@@ -282,7 +284,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_SHELL_VNODE_BITS 24
#define TSDB_SHELL_SID_MASK 0xFF
#define TSDB_HTTP_TOKEN_LEN 20
#define TSDB_SHOW_SQL_LEN
64
#define TSDB_SHOW_SQL_LEN
512
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_MQTT_HOSTNAME_LEN 64
...
...
src/inc/taosmsg.h
浏览文件 @
17ccfe4a
...
...
@@ -305,6 +305,8 @@ typedef struct {
char
clientVersion
[
TSDB_VERSION_LEN
];
char
msgVersion
[
TSDB_VERSION_LEN
];
char
db
[
TSDB_TABLE_FNAME_LEN
];
char
appName
[
TSDB_APPNAME_LEN
];
int32_t
pid
;
}
SCMConnectMsg
;
typedef
struct
{
...
...
@@ -746,6 +748,7 @@ typedef struct {
uint32_t
queryId
;
int64_t
useconds
;
int64_t
stime
;
uint64_t
qHandle
;
}
SQueryDesc
;
typedef
struct
{
...
...
@@ -761,8 +764,10 @@ typedef struct {
typedef
struct
{
uint32_t
connId
;
int32_t
pid
;
int32_t
numOfQueries
;
int32_t
numOfStreams
;
char
appName
[
TSDB_APPNAME_LEN
];
char
pData
[];
}
SCMHeartBeatMsg
;
...
...
src/mnode/inc/mnodeProfile.h
浏览文件 @
17ccfe4a
...
...
@@ -23,6 +23,8 @@ extern "C" {
typedef
struct
{
char
user
[
TSDB_USER_LEN
];
char
appName
[
TSDB_APPNAME_LEN
];
// app name that invokes taosc
uint32_t
pid
;
// pid of app that invokes taosc
int8_t
killed
;
uint16_t
port
;
uint32_t
ip
;
...
...
@@ -40,7 +42,7 @@ typedef struct {
int32_t
mnodeInitProfile
();
void
mnodeCleanupProfile
();
SConnObj
*
mnodeCreateConn
(
char
*
user
,
uint32_t
ip
,
uint16_t
port
);
SConnObj
*
mnodeCreateConn
(
char
*
user
,
uint32_t
ip
,
uint16_t
port
,
int32_t
pid
,
const
char
*
app
);
SConnObj
*
mnodeAccquireConn
(
int32_t
connId
,
char
*
user
,
uint32_t
ip
,
uint16_t
port
);
void
mnodeReleaseConn
(
SConnObj
*
pConn
);
int32_t
mnodeSaveQueryStreamList
(
SConnObj
*
pConn
,
SCMHeartBeatMsg
*
pHBMsg
);
...
...
src/mnode/src/mnodeCluster.c
浏览文件 @
17ccfe4a
...
...
@@ -224,7 +224,8 @@ static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows,
mnodeDecClusterRef
(
pCluster
);
numOfRows
++
;
}
mnodeVacuumResult
(
data
,
cols
,
numOfRows
,
rows
,
pShow
);
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
src/mnode/src/mnodeDb.c
浏览文件 @
17ccfe4a
...
...
@@ -760,7 +760,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
}
pShow
->
numOfReads
+=
numOfRows
;
mnodeVacuumResult
(
data
,
col
s
,
numOfRows
,
rows
,
pShow
);
mnodeVacuumResult
(
data
,
pShow
->
numOfColumn
s
,
numOfRows
,
rows
,
pShow
);
mnodeDecUserRef
(
pUser
);
return
numOfRows
;
...
...
src/mnode/src/mnodeDnode.c
浏览文件 @
17ccfe4a
...
...
@@ -790,6 +790,7 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo
mnodeDecDnodeRef
(
pDnode
);
}
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
...
...
@@ -891,8 +892,8 @@ int32_t mnodeRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pC
mnodeDecDnodeRef
(
pDnode
);
}
mnodeVacuumResult
(
data
,
cols
,
numOfRows
,
rows
,
pShow
);
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
...
...
@@ -992,6 +993,7 @@ static int32_t mnodeRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, v
}
}
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
...
...
@@ -1083,8 +1085,8 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo
}
else
{
numOfRows
=
0
;
}
mnodeVacuumResult
(
data
,
cols
,
numOfRows
,
rows
,
pShow
);
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
...
...
src/mnode/src/mnodeMnode.c
浏览文件 @
17ccfe4a
...
...
@@ -480,8 +480,8 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
mnodeDecMnodeRef
(
pMnode
);
}
mnodeVacuumResult
(
data
,
cols
,
numOfRows
,
rows
,
pShow
);
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
...
...
src/mnode/src/mnodeProfile.c
浏览文件 @
17ccfe4a
...
...
@@ -24,15 +24,9 @@
#include "mnode.h"
#include "mnodeDef.h"
#include "mnodeInt.h"
#include "mnodeAcct.h"
#include "mnodeDnode.h"
#include "mnodeDb.h"
#include "mnodeMnode.h"
#include "mnodeProfile.h"
#include "mnodeShow.h"
#include "mnodeTable.h"
#include "mnodeUser.h"
#include "mnodeVgroup.h"
#include "mnodeWrite.h"
#define CONN_KEEP_TIME (tsShellActivityTimer * 3)
...
...
@@ -78,7 +72,7 @@ void mnodeCleanupProfile() {
}
}
SConnObj
*
mnodeCreateConn
(
char
*
user
,
uint32_t
ip
,
uint16_t
port
)
{
SConnObj
*
mnodeCreateConn
(
char
*
user
,
uint32_t
ip
,
uint16_t
port
,
int32_t
pid
,
const
char
*
app
)
{
#if 0
int32_t connSize = taosHashGetSize(tsMnodeConnCache->pHashTable);
if (connSize > tsMaxShellConns) {
...
...
@@ -96,10 +90,13 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
.
ip
=
ip
,
.
port
=
port
,
.
connId
=
connId
,
.
stime
=
taosGetTimestampMs
()
.
stime
=
taosGetTimestampMs
(),
.
pid
=
pid
,
};
tstrncpy
(
connObj
.
user
,
user
,
sizeof
(
connObj
.
user
));
tstrncpy
(
connObj
.
user
,
user
,
tListLen
(
connObj
.
user
));
tstrncpy
(
connObj
.
appName
,
app
,
tListLen
(
connObj
.
appName
));
connObj
.
lastAccess
=
connObj
.
stime
;
SConnObj
*
pConn
=
taosCachePut
(
tsMnodeConnCache
,
&
connId
,
sizeof
(
int32_t
),
&
connObj
,
sizeof
(
connObj
),
CONN_KEEP_TIME
*
1000
);
...
...
@@ -183,6 +180,20 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
// app name
pShow
->
bytes
[
cols
]
=
TSDB_APPNAME_LEN
+
VARSTR_HEADER_SIZE
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"app_name"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
// app pid
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"pid"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
TSDB_IPv4ADDR_LEN
+
6
+
VARSTR_HEADER_SIZE
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"ip:port"
);
...
...
@@ -191,13 +202,13 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow
->
bytes
[
cols
]
=
8
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
pSchema
[
cols
].
name
,
"login
time"
);
strcpy
(
pSchema
[
cols
].
name
,
"login
_
time"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
8
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
pSchema
[
cols
].
name
,
"last
access"
);
strcpy
(
pSchema
[
cols
].
name
,
"last
_
access"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
...
...
@@ -236,6 +247,16 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
pConnObj
->
user
,
pShow
->
bytes
[
cols
]);
cols
++
;
// app name
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
pConnObj
->
appName
,
pShow
->
bytes
[
cols
]);
cols
++
;
// app pid
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int32_t
*
)
pWrite
=
pConnObj
->
pid
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
snprintf
(
ipStr
,
sizeof
(
ipStr
),
"%s:%u"
,
taosIpStr
(
pConnObj
->
ip
),
pConnObj
->
port
);
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
ipStr
,
pShow
->
bytes
[
cols
]);
...
...
@@ -254,8 +275,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
}
pShow
->
numOfReads
+=
numOfRows
;
const
int32_t
NUM_OF_COLUMNS
=
5
;
mnodeVacuumResult
(
data
,
NUM_OF_COLUMNS
,
numOfRows
,
rows
,
pShow
);
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
return
numOfRows
;
}
...
...
@@ -299,7 +319,7 @@ static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow
->
bytes
[
cols
]
=
QUERY_ID_SIZE
+
VARSTR_HEADER_SIZE
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"query
I
d"
);
strcpy
(
pSchema
[
cols
].
name
,
"query
_i
d"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
...
...
@@ -315,9 +335,15 @@ static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
24
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"qhandle"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
8
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
pSchema
[
cols
].
name
,
"created
time"
);
strcpy
(
pSchema
[
cols
].
name
,
"created
_
time"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
...
...
@@ -352,7 +378,7 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v
SConnObj
*
pConnObj
=
NULL
;
int32_t
cols
=
0
;
char
*
pWrite
;
char
ipStr
[
TSDB_IPv4ADDR_LEN
+
6
]
;
char
str
[
TSDB_IPv4ADDR_LEN
+
6
]
=
{
0
}
;
while
(
numOfRows
<
rows
)
{
pShow
->
pIter
=
mnodeGetNextConn
(
pShow
->
pIter
,
&
pConnObj
);
...
...
@@ -362,9 +388,9 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v
SQueryDesc
*
pDesc
=
pConnObj
->
pQueries
+
i
;
cols
=
0
;
snprintf
(
ipS
tr
,
QUERY_ID_SIZE
+
1
,
"%u:%u"
,
pConnObj
->
connId
,
htonl
(
pDesc
->
queryId
));
snprintf
(
s
tr
,
QUERY_ID_SIZE
+
1
,
"%u:%u"
,
pConnObj
->
connId
,
htonl
(
pDesc
->
queryId
));
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
ipS
tr
,
pShow
->
bytes
[
cols
]);
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
s
tr
,
pShow
->
bytes
[
cols
]);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
...
...
@@ -372,8 +398,15 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
snprintf
(
ipStr
,
sizeof
(
ipStr
),
"%s:%u"
,
taosIpStr
(
pConnObj
->
ip
),
pConnObj
->
port
);
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
ipStr
,
pShow
->
bytes
[
cols
]);
snprintf
(
str
,
tListLen
(
str
),
"%s:%u"
,
taosIpStr
(
pConnObj
->
ip
),
pConnObj
->
port
);
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
str
,
pShow
->
bytes
[
cols
]);
cols
++
;
char
handleBuf
[
24
]
=
{
0
};
snprintf
(
handleBuf
,
tListLen
(
handleBuf
),
"%p"
,
(
void
*
)
htobe64
(
pDesc
->
qHandle
));
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
handleBuf
,
pShow
->
bytes
[
cols
]);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
...
...
@@ -393,8 +426,7 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v
}
pShow
->
numOfReads
+=
numOfRows
;
const
int32_t
NUM_OF_COLUMNS
=
6
;
mnodeVacuumResult
(
data
,
NUM_OF_COLUMNS
,
numOfRows
,
rows
,
pShow
);
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
return
numOfRows
;
}
...
...
@@ -522,8 +554,7 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v
}
pShow
->
numOfReads
+=
numOfRows
;
const
int32_t
NUM_OF_COLUMNS
=
8
;
mnodeVacuumResult
(
data
,
NUM_OF_COLUMNS
,
numOfRows
,
rows
,
pShow
);
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
return
numOfRows
;
}
...
...
src/mnode/src/mnodeShow.c
浏览文件 @
17ccfe4a
...
...
@@ -186,7 +186,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
rowsToRead
=
pShow
->
numOfRows
-
pShow
->
numOfReads
;
}
/* return no more than 100
meter
s in one round trip */
/* return no more than 100
table
s in one round trip */
if
(
rowsToRead
>
100
)
rowsToRead
=
100
;
/*
...
...
@@ -244,7 +244,8 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
int32_t
connId
=
htonl
(
pHBMsg
->
connId
);
SConnObj
*
pConn
=
mnodeAccquireConn
(
connId
,
connInfo
.
user
,
connInfo
.
clientIp
,
connInfo
.
clientPort
);
if
(
pConn
==
NULL
)
{
pConn
=
mnodeCreateConn
(
connInfo
.
user
,
connInfo
.
clientIp
,
connInfo
.
clientPort
);
pHBMsg
->
pid
=
htonl
(
pHBMsg
->
pid
);
pConn
=
mnodeCreateConn
(
connInfo
.
user
,
connInfo
.
clientIp
,
connInfo
.
clientPort
,
pHBMsg
->
pid
,
pHBMsg
->
appName
);
}
if
(
pConn
==
NULL
)
{
...
...
@@ -325,7 +326,8 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
goto
connect_over
;
}
SConnObj
*
pConn
=
mnodeCreateConn
(
connInfo
.
user
,
connInfo
.
clientIp
,
connInfo
.
clientPort
);
pConnectMsg
->
pid
=
htonl
(
pConnectMsg
->
pid
);
SConnObj
*
pConn
=
mnodeCreateConn
(
connInfo
.
user
,
connInfo
.
clientIp
,
connInfo
.
clientPort
,
pConnectMsg
->
pid
,
pConnectMsg
->
appName
);
if
(
pConn
==
NULL
)
{
code
=
terrno
;
}
else
{
...
...
src/mnode/src/mnodeTable.c
浏览文件 @
17ccfe4a
...
...
@@ -63,27 +63,27 @@ static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t
static
int32_t
mnodeGetStreamTableMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mnodeRetrieveStreamTables
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mnodeProcessCreateTableMsg
(
SMnodeMsg
*
mnode
Msg
);
static
int32_t
mnodeProcessCreateTableMsg
(
SMnodeMsg
*
p
Msg
);
static
int32_t
mnodeProcessCreateSuperTableMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mnodeProcessCreateChildTableMsg
(
SMnodeMsg
*
pMsg
);
static
void
mnodeProcessCreateChildTableRsp
(
SRpcMsg
*
rpcMsg
);
static
int32_t
mnodeProcessDropTableMsg
(
SMnodeMsg
*
mnode
Msg
);
static
int32_t
mnodeProcessDropTableMsg
(
SMnodeMsg
*
p
Msg
);
static
int32_t
mnodeProcessDropSuperTableMsg
(
SMnodeMsg
*
pMsg
);
static
void
mnodeProcessDropSuperTableRsp
(
SRpcMsg
*
rpcMsg
);
static
int32_t
mnodeProcessDropChildTableMsg
(
SMnodeMsg
*
pMsg
);
static
void
mnodeProcessDropChildTableRsp
(
SRpcMsg
*
rpcMsg
);
static
int32_t
mnodeProcessSuperTableVgroupMsg
(
SMnodeMsg
*
mnode
Msg
);
static
int32_t
mnodeProcessMultiTableMetaMsg
(
SMnodeMsg
*
mnode
Msg
);
static
int32_t
mnodeProcessTableCfgMsg
(
SMnodeMsg
*
mnode
Msg
);
static
int32_t
mnodeProcessSuperTableVgroupMsg
(
SMnodeMsg
*
p
Msg
);
static
int32_t
mnodeProcessMultiTableMetaMsg
(
SMnodeMsg
*
p
Msg
);
static
int32_t
mnodeProcessTableCfgMsg
(
SMnodeMsg
*
p
Msg
);
static
int32_t
mnodeProcessTableMetaMsg
(
SMnodeMsg
*
mnode
Msg
);
static
int32_t
mnodeProcessTableMetaMsg
(
SMnodeMsg
*
p
Msg
);
static
int32_t
mnodeGetSuperTableMeta
(
SMnodeMsg
*
pMsg
);
static
int32_t
mnodeGetChildTableMeta
(
SMnodeMsg
*
pMsg
);
static
int32_t
mnodeAutoCreateChildTable
(
SMnodeMsg
*
pMsg
);
static
int32_t
mnodeProcessAlterTableMsg
(
SMnodeMsg
*
mnode
Msg
);
static
int32_t
mnodeProcessAlterTableMsg
(
SMnodeMsg
*
p
Msg
);
static
void
mnodeProcessAlterTableRsp
(
SRpcMsg
*
rpcMsg
);
static
int32_t
mnodeFindSuperTableColumnIndex
(
SSuperTableObj
*
pStable
,
char
*
colName
);
...
...
@@ -1384,9 +1384,8 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
}
pShow
->
numOfReads
+=
numOfRows
;
const
int32_t
NUM_OF_COLUMNS
=
5
;
mnodeVacuumResult
(
data
,
NUM_OF_COLUMNS
,
numOfRows
,
rows
,
pShow
);
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
mnodeDecDbRef
(
pDb
);
return
numOfRows
;
...
...
@@ -2543,6 +2542,25 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
8
;
// table uid
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BIGINT
;
strcpy
(
pSchema
[
cols
].
name
,
"uid"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"tid"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"vgId"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pMeta
->
numOfColumns
=
htons
(
cols
);
pShow
->
numOfColumns
=
cols
;
...
...
@@ -2568,6 +2586,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
return
0
;
}
int32_t
cols
=
0
;
int32_t
numOfRows
=
0
;
SChildTableObj
*
pTable
=
NULL
;
SPatternCompareInfo
info
=
PATTERN_COMPARE_INFO_INITIALIZER
;
...
...
@@ -2608,8 +2627,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
continue
;
}
int32_t
cols
=
0
;
cols
=
0
;
char
*
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
tableName
,
pShow
->
bytes
[
cols
]);
...
...
@@ -2638,14 +2656,29 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
cols
++
;
// uid
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
pTable
->
uid
;
cols
++
;
// tid
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int32_t
*
)
pWrite
=
pTable
->
sid
;
cols
++
;
//vgid
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int32_t
*
)
pWrite
=
pTable
->
vgId
;
cols
++
;
numOfRows
++
;
mnodeDecTableRef
(
pTable
);
}
pShow
->
numOfReads
+=
numOfRows
;
const
int32_t
NUM_OF_COLUMNS
=
4
;
mnodeVacuumResult
(
data
,
NUM_OF_COLUMNS
,
numOfRows
,
rows
,
pShow
);
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
mnodeDecDbRef
(
pDb
);
free
(
pattern
);
...
...
@@ -2843,9 +2876,8 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
}
pShow
->
numOfReads
+=
numOfRows
;
const
int32_t
NUM_OF_COLUMNS
=
4
;
mnodeVacuumResult
(
data
,
NUM_OF_COLUMNS
,
numOfRows
,
rows
,
pShow
);
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
mnodeDecDbRef
(
pDb
);
return
numOfRows
;
...
...
src/mnode/src/mnodeUser.c
浏览文件 @
17ccfe4a
...
...
@@ -385,8 +385,8 @@ static int32_t mnodeRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, voi
numOfRows
++
;
mnodeDecUserRef
(
pUser
);
}
mnodeVacuumResult
(
data
,
cols
,
numOfRows
,
rows
,
pShow
);
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
...
...
src/mnode/src/mnodeVgroup.c
浏览文件 @
17ccfe4a
...
...
@@ -771,7 +771,8 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
mnodeDecVgroupRef
(
pVgroup
);
numOfRows
++
;
}
mnodeVacuumResult
(
data
,
cols
,
numOfRows
,
rows
,
pShow
);
mnodeVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
pShow
->
numOfReads
+=
numOfRows
;
mnodeDecTableRef
(
pTable
);
...
...
src/os/inc/osSemphone.h
浏览文件 @
17ccfe4a
...
...
@@ -33,6 +33,8 @@ bool taosCheckPthreadValid(pthread_t thread);
int64_t
taosGetPthreadId
();
void
taosResetPthread
(
pthread_t
*
thread
);
bool
taosComparePthread
(
pthread_t
first
,
pthread_t
second
);
int32_t
taosGetPId
();
int32_t
taosGetCurrentAPPName
(
char
*
name
,
int32_t
*
len
);
#ifdef __cplusplus
}
...
...
src/os/src/detail/osSemphone.c
浏览文件 @
17ccfe4a
...
...
@@ -34,5 +34,31 @@ bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; }
int64_t
taosGetPthreadId
()
{
return
(
int64_t
)
pthread_self
();
}
void
taosResetPthread
(
pthread_t
*
thread
)
{
*
thread
=
0
;
}
bool
taosComparePthread
(
pthread_t
first
,
pthread_t
second
)
{
return
first
==
second
;
}
int32_t
taosGetPId
()
{
return
getpid
();
}
int32_t
taosGetCurrentAPPName
(
char
*
name
,
int32_t
*
len
)
{
const
char
*
self
=
"/proc/self/exe"
;
char
path
[
PATH_MAX
]
=
{
0
};
if
(
readlink
(
self
,
path
,
PATH_MAX
)
<=
0
)
{
return
-
1
;
}
path
[
PATH_MAX
-
1
]
=
0
;
char
*
end
=
strrchr
(
path
,
'/'
);
if
(
end
==
NULL
)
{
return
-
1
;
}
++
end
;
strcpy
(
name
,
end
);
if
(
len
!=
NULL
)
{
*
len
=
strlen
(
name
);
}
return
0
;
}
#endif
\ No newline at end of file
src/os/src/detail/osSysinfo.c
浏览文件 @
17ccfe4a
...
...
@@ -203,7 +203,7 @@ static void taosGetSystemTimezone() {
snprintf
(
tsTimezone
,
TSDB_TIMEZONE_LEN
,
"%s (%s, %s%02d00)"
,
buf
,
tzname
[
daylight
],
tz
>=
0
?
"+"
:
"-"
,
abs
(
tz
));
// cfg_timezone->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT;
u
Info
(
"timezone not configured, set to system default:%s"
,
tsTimezone
);
u
Warn
(
"timezone not configured, set to system default:%s"
,
tsTimezone
);
}
/*
...
...
@@ -235,7 +235,7 @@ static void taosGetSystemLocale() { // get and set default locale
strcpy
(
tsLocale
,
"en_US.UTF-8"
);
}
else
{
tstrncpy
(
tsLocale
,
locale
,
TSDB_LOCALE_LEN
);
u
Error
(
"locale not configured, set to system default:%s"
,
tsLocale
);
u
Warn
(
"locale not configured, set to system default:%s"
,
tsLocale
);
}
}
...
...
src/os/src/windows/wSemphone.c
浏览文件 @
17ccfe4a
...
...
@@ -36,3 +36,21 @@ int64_t taosGetPthreadId() {
bool
taosComparePthread
(
pthread_t
first
,
pthread_t
second
)
{
return
first
.
p
==
second
.
p
;
}
int32_t
taosGetPId
()
{
return
GetCurrentProcessId
();
}
int32_t
taosGetCurrentAPPName
(
char
*
name
,
int32_t
*
len
)
{
char
filepath
[
1024
]
=
{
0
};
GetModuleFileName
(
NULL
,
filepath
,
MAX_PATH
);
*
strrchr
(
filepath
,
'.'
)
=
'\0'
;
strcpy
(
name
,
filepath
);
if
(
len
!=
NULL
)
{
*
len
=
(
int32_t
)
strlen
(
filepath
);
}
return
0
;
}
\ No newline at end of file
src/util/src/tcache.c
浏览文件 @
17ccfe4a
...
...
@@ -107,12 +107,14 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
free
(
pNode
);
}
static
FORCE_INLINE
void
doRemoveElemInTrashcan
(
SCacheObj
*
pCacheObj
,
STrashElem
*
pElem
)
{
static
FORCE_INLINE
STrashElem
*
doRemoveElemInTrashcan
(
SCacheObj
*
pCacheObj
,
STrashElem
*
pElem
)
{
if
(
pElem
->
pData
->
signature
!=
(
uint64_t
)
pElem
->
pData
)
{
uWarn
(
"key:sig:0x%"
PRIx64
" %p data has been released, ignore"
,
pElem
->
pData
->
signature
,
pElem
->
pData
);
return
;
return
NULL
;
}
STrashElem
*
next
=
pElem
->
next
;
pCacheObj
->
numOfElemsInTrash
--
;
if
(
pElem
->
prev
)
{
pElem
->
prev
->
next
=
pElem
->
next
;
...
...
@@ -120,9 +122,15 @@ static FORCE_INLINE void doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem
pCacheObj
->
pTrash
=
pElem
->
next
;
}
if
(
pElem
->
next
)
{
pElem
->
next
->
prev
=
pElem
->
prev
;
if
(
next
)
{
next
->
prev
=
pElem
->
prev
;
}
if
(
pCacheObj
->
numOfElemsInTrash
==
0
)
{
assert
(
pCacheObj
->
pTrash
==
NULL
);
}
return
next
;
}
static
FORCE_INLINE
void
doDestroyTrashcanElem
(
SCacheObj
*
pCacheObj
,
STrashElem
*
pElem
)
{
...
...
@@ -559,31 +567,30 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
if
(
pCacheObj
->
numOfElemsInTrash
==
0
)
{
if
(
pCacheObj
->
pTrash
!=
NULL
)
{
pCacheObj
->
pTrash
=
NULL
;
uError
(
"cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d"
,
pCacheObj
->
name
,
pCacheObj
->
numOfElemsInTrash
);
}
pCacheObj
->
pTrash
=
NULL
;
__cache_unlock
(
pCacheObj
);
return
;
}
STrashElem
*
pElem
=
pCacheObj
->
pTrash
;
const
char
*
stat
[]
=
{
"false"
,
"true"
};
uDebug
(
"cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s"
,
pCacheObj
->
name
,
pCacheObj
->
numOfElemsInTrash
,
(
force
?
stat
[
1
]
:
stat
[
0
]));
STrashElem
*
pElem
=
pCacheObj
->
pTrash
;
while
(
pElem
)
{
T_REF_VAL_CHECK
(
pElem
->
pData
);
if
(
pElem
->
next
==
pElem
)
{
pElem
->
next
=
NULL
;
}
assert
(
pElem
->
next
!=
pElem
&&
pElem
->
prev
!=
pElem
);
if
(
force
||
(
T_REF_VAL_GET
(
pElem
->
pData
)
==
0
))
{
uDebug
(
"cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d"
,
pCacheObj
->
name
,
pElem
->
pData
->
key
,
pElem
->
pData
->
data
,
pCacheObj
->
numOfElemsInTrash
-
1
);
STrashElem
*
p
=
pElem
;
pElem
=
pElem
->
next
;
doRemoveElemInTrashcan
(
pCacheObj
,
p
);
doDestroyTrashcanElem
(
pCacheObj
,
p
);
doRemoveElemInTrashcan
(
pCacheObj
,
pElem
);
doDestroyTrashcanElem
(
pCacheObj
,
pElem
);
pElem
=
pCacheObj
->
pTrash
;
}
else
{
pElem
=
pElem
->
next
;
}
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
17ccfe4a
...
...
@@ -261,6 +261,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if
(
vnodeNotifyCurrentQhandle
(
pReadMsg
->
rpcMsg
.
handle
,
*
handle
,
pVnode
->
vgId
)
!=
TSDB_CODE_SUCCESS
)
{
vError
(
"vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p"
,
pVnode
->
vgId
,
*
handle
,
pReadMsg
->
rpcMsg
.
handle
);
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
qKillQuery
(
*
handle
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
return
code
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录