Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
80217813
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
80217813
编写于
3月 14, 2020
作者:
H
hjxilinx
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
file rename, move query functions to different module.
上级
128d8746
变更
38
显示空白变更内容
内联
并排
Showing
38 changed file
with
182 addition
and
371 deletion
+182
-371
src/client/inc/tscSQLParser.h
src/client/inc/tscSQLParser.h
+2
-17
src/client/inc/tscSecondaryMerge.h
src/client/inc/tscSecondaryMerge.h
+2
-2
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+3
-2
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+7
-10
src/client/src/tscCache.c
src/client/src/tscCache.c
+0
-264
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+6
-6
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+1
-1
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+1
-2
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+2
-1
src/client/src/tscServer.c
src/client/src/tscServer.c
+6
-14
src/client/src/tscSql.c
src/client/src/tscSql.c
+1
-1
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+0
-3
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+91
-2
src/inc/qpercentile.h
src/inc/qpercentile.h
+1
-1
src/mnode/src/mgmtTable.c
src/mnode/src/mgmtTable.c
+14
-14
src/query/inc/qextbuffer.h
src/query/inc/qextbuffer.h
+0
-0
src/query/inc/qhistogram.h
src/query/inc/qhistogram.h
+0
-0
src/query/inc/qinterpolation.h
src/query/inc/qinterpolation.h
+4
-0
src/query/inc/qresultBuf.h
src/query/inc/qresultBuf.h
+1
-1
src/query/inc/qsqlparser.h
src/query/inc/qsqlparser.h
+12
-0
src/query/inc/qsyntaxtreefunction.h
src/query/inc/qsyntaxtreefunction.h
+0
-0
src/query/src/qast.c
src/query/src/qast.c
+4
-5
src/query/src/qextbuffer.c
src/query/src/qextbuffer.c
+2
-2
src/query/src/qhistogram.c
src/query/src/qhistogram.c
+2
-2
src/query/src/qinterpolation.c
src/query/src/qinterpolation.c
+3
-3
src/query/src/qparserImpl.c
src/query/src/qparserImpl.c
+0
-0
src/query/src/qpercentile.c
src/query/src/qpercentile.c
+2
-3
src/query/src/qresultBuf.c
src/query/src/qresultBuf.c
+2
-2
src/query/src/qsyntaxtreefunction.c
src/query/src/qsyntaxtreefunction.c
+1
-1
src/query/src/qtokenizer.c
src/query/src/qtokenizer.c
+0
-0
src/util/inc/tcache.h
src/util/inc/tcache.h
+0
-0
src/util/src/tcache.c
src/util/src/tcache.c
+0
-0
src/vnode/detail/inc/vnodeRead.h
src/vnode/detail/inc/vnodeRead.h
+2
-2
src/vnode/detail/src/vnodeQueryImpl.c
src/vnode/detail/src/vnodeQueryImpl.c
+2
-2
src/vnode/detail/src/vnodeQueryProcess.c
src/vnode/detail/src/vnodeQueryProcess.c
+1
-1
src/vnode/detail/src/vnodeRead.c
src/vnode/detail/src/vnodeRead.c
+1
-1
src/vnode/detail/src/vnodeSupertableQuery.c
src/vnode/detail/src/vnodeSupertableQuery.c
+3
-3
src/vnode/detail/src/vnodeTagMgmt.c
src/vnode/detail/src/vnodeTagMgmt.c
+3
-3
未找到文件。
src/client/inc/tscSQLParser.h
浏览文件 @
80217813
...
...
@@ -20,24 +20,9 @@
extern
"C"
{
#endif
#include "taos.h"
#include "taosmsg.h"
#include "ttokendef.h"
#include "taosdef.h"
#include "tvariant.h"
#include "qsqlparser.h"
#include "tsclient.h"
enum
{
TSQL_NODE_TYPE_EXPR
=
0x1
,
TSQL_NODE_TYPE_ID
=
0x2
,
TSQL_NODE_TYPE_VALUE
=
0x4
,
};
#define NON_ARITHMEIC_EXPR 0
#define NORMAL_ARITHMETIC 1
#define AGG_ARIGHTMEIC 2
int32_t
tSQLParse
(
SSqlInfo
*
pSQLInfo
,
const
char
*
pSql
);
int32_t
tscToSQLCmd
(
struct
SSqlObj
*
pSql
,
struct
SSqlInfo
*
pInfo
);
#ifdef __cplusplus
}
...
...
src/client/inc/tscSecondaryMerge.h
浏览文件 @
80217813
...
...
@@ -20,9 +20,9 @@
extern
"C"
{
#endif
#include "qextbuffer.h"
#include "qinterpolation.h"
#include "taosmsg.h"
#include "textbuffer.h"
#include "tinterpolation.h"
#include "tlosertree.h"
#include "tsclient.h"
...
...
src/client/inc/tscUtil.h
浏览文件 @
80217813
...
...
@@ -24,10 +24,10 @@ extern "C" {
* @date 2018/09/30
*/
#include "os.h"
#include "textbuffer.h"
#include "qextbuffer.h"
#include "taosdef.h"
#include "tscSecondaryMerge.h"
#include "tsclient.h"
#include "taosdef.h"
#define UTIL_METER_IS_SUPERTABLE(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->tableType == TSDB_TABLE_TYPE_SUPER_TABLE))
...
...
@@ -252,6 +252,7 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void
tscAsyncQuerySingleRowForNextVnode
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
);
void
tscTryQueryNextClause
(
SSqlObj
*
pSql
,
void
(
*
queryFp
)());
int32_t
launchMultivnodeInsert
(
SSqlObj
*
pSql
);
#ifdef __cplusplus
}
...
...
src/client/inc/tsclient.h
浏览文件 @
80217813
...
...
@@ -32,6 +32,7 @@ extern "C" {
#include "tutil.h"
#include "trpc.h"
#include "qsqltype.h"
#include "qsqlparser.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows)
...
...
@@ -308,14 +309,14 @@ typedef struct _tsc_obj {
char
sversion
[
TSDB_VERSION_LEN
];
char
writeAuth
:
1
;
char
superAuth
:
1
;
struct
_sql_o
bj
*
pSql
;
struct
_sql_o
bj
*
pHb
;
struct
_sql_o
bj
*
sqlList
;
struct
SSqlO
bj
*
pSql
;
struct
SSqlO
bj
*
pHb
;
struct
SSqlO
bj
*
sqlList
;
struct
_sstream
*
streamList
;
pthread_mutex_t
mutex
;
}
STscObj
;
typedef
struct
_sql_o
bj
{
typedef
struct
SSqlO
bj
{
void
*
signature
;
STscObj
*
pTscObj
;
void
(
*
fp
)();
...
...
@@ -340,8 +341,8 @@ typedef struct _sql_obj {
uint8_t
numOfSubs
;
char
*
asyncTblPos
;
void
*
pTableHashList
;
struct
_sql_o
bj
**
pSubs
;
struct
_sql_o
bj
*
prev
,
*
next
;
struct
SSqlO
bj
**
pSubs
;
struct
SSqlO
bj
*
prev
,
*
next
;
}
SSqlObj
;
typedef
struct
_sstream
{
...
...
@@ -442,9 +443,6 @@ char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t
tscInvalidSQLErrMsg
(
char
*
msg
,
const
char
*
additionalInfo
,
const
char
*
sql
);
// transfer SSqlInfo to SqlCmd struct
int32_t
tscToSQLCmd
(
SSqlObj
*
pSql
,
struct
SSqlInfo
*
pInfo
);
void
tscQueueAsyncFreeResult
(
SSqlObj
*
pSql
);
extern
void
*
pVnodeConn
;
...
...
@@ -453,7 +451,6 @@ extern void * tscCacheHandle;
extern
int32_t
globalCode
;
extern
int
slaveIndex
;
extern
void
*
tscTmr
;
extern
void
*
tscConnCache
;
extern
void
*
tscQhandle
;
extern
int
tscKeepConn
[];
extern
int
tsInsertHeadSize
;
...
...
src/client/src/tscCache.c
已删除
100644 → 0
浏览文件 @
128d8746
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "tmempool.h"
#include "tsclient.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
typedef
struct
_c_hash_t
{
uint32_t
ip
;
uint16_t
port
;
struct
_c_hash_t
*
prev
;
struct
_c_hash_t
*
next
;
void
*
data
;
uint64_t
time
;
}
SConnHash
;
typedef
struct
{
SConnHash
**
connHashList
;
mpool_h
connHashMemPool
;
int
maxSessions
;
int
total
;
int
*
count
;
int64_t
keepTimer
;
pthread_mutex_t
mutex
;
void
(
*
cleanFp
)(
void
*
);
void
*
tmrCtrl
;
void
*
pTimer
;
}
SConnCache
;
int
taosHashConn
(
void
*
handle
,
uint32_t
ip
,
uint16_t
port
,
char
*
user
)
{
SConnCache
*
pObj
=
(
SConnCache
*
)
handle
;
int
hash
=
0
;
// size_t user_len = strlen(user);
hash
=
ip
>>
16
;
hash
+=
(
unsigned
short
)(
ip
&
0xFFFF
);
hash
+=
port
;
while
(
*
user
!=
'\0'
)
{
hash
+=
*
user
;
user
++
;
}
hash
=
hash
%
pObj
->
maxSessions
;
return
hash
;
}
void
taosRemoveExpiredNodes
(
SConnCache
*
pObj
,
SConnHash
*
pNode
,
int
hash
,
uint64_t
time
)
{
if
(
pNode
==
NULL
)
return
;
if
(
time
<
pObj
->
keepTimer
+
pNode
->
time
)
return
;
SConnHash
*
pPrev
=
pNode
->
prev
,
*
pNext
;
while
(
pNode
)
{
(
*
pObj
->
cleanFp
)(
pNode
->
data
);
pNext
=
pNode
->
next
;
pObj
->
total
--
;
pObj
->
count
[
hash
]
--
;
tscTrace
(
"%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d"
,
pNode
->
data
,
pNode
->
ip
,
pNode
->
port
,
hash
,
pNode
,
pObj
->
count
[
hash
]);
taosMemPoolFree
(
pObj
->
connHashMemPool
,
(
char
*
)
pNode
);
pNode
=
pNext
;
}
if
(
pPrev
)
pPrev
->
next
=
NULL
;
else
pObj
->
connHashList
[
hash
]
=
NULL
;
}
void
*
taosAddConnIntoCache
(
void
*
handle
,
void
*
data
,
uint32_t
ip
,
uint16_t
port
,
char
*
user
)
{
int
hash
;
SConnHash
*
pNode
;
SConnCache
*
pObj
;
uint64_t
time
=
taosGetTimestampMs
();
pObj
=
(
SConnCache
*
)
handle
;
if
(
pObj
==
NULL
||
pObj
->
maxSessions
==
0
)
return
NULL
;
if
(
data
==
NULL
)
{
tscTrace
(
"data:%p ip:%p:%d not valid, not added in cache"
,
data
,
ip
,
port
);
return
NULL
;
}
hash
=
taosHashConn
(
pObj
,
ip
,
port
,
user
);
pNode
=
(
SConnHash
*
)
taosMemPoolMalloc
(
pObj
->
connHashMemPool
);
pNode
->
ip
=
ip
;
pNode
->
port
=
port
;
pNode
->
data
=
data
;
pNode
->
prev
=
NULL
;
pNode
->
time
=
time
;
pthread_mutex_lock
(
&
pObj
->
mutex
);
pNode
->
next
=
pObj
->
connHashList
[
hash
];
if
(
pObj
->
connHashList
[
hash
]
!=
NULL
)
(
pObj
->
connHashList
[
hash
])
->
prev
=
pNode
;
pObj
->
connHashList
[
hash
]
=
pNode
;
pObj
->
total
++
;
pObj
->
count
[
hash
]
++
;
taosRemoveExpiredNodes
(
pObj
,
pNode
->
next
,
hash
,
time
);
pthread_mutex_unlock
(
&
pObj
->
mutex
);
tscTrace
(
"%p ip:0x%x:%hu:%d:%p added, connections in cache:%d"
,
data
,
ip
,
port
,
hash
,
pNode
,
pObj
->
count
[
hash
]);
return
pObj
;
}
void
taosCleanConnCache
(
void
*
handle
,
void
*
tmrId
)
{
int
hash
;
SConnHash
*
pNode
;
SConnCache
*
pObj
;
pObj
=
(
SConnCache
*
)
handle
;
if
(
pObj
==
NULL
||
pObj
->
maxSessions
==
0
)
return
;
if
(
pObj
->
pTimer
!=
tmrId
)
return
;
uint64_t
time
=
taosGetTimestampMs
();
for
(
hash
=
0
;
hash
<
pObj
->
maxSessions
;
++
hash
)
{
pthread_mutex_lock
(
&
pObj
->
mutex
);
pNode
=
pObj
->
connHashList
[
hash
];
taosRemoveExpiredNodes
(
pObj
,
pNode
,
hash
,
time
);
pthread_mutex_unlock
(
&
pObj
->
mutex
);
}
// tscTrace("timer, total connections in cache:%d", pObj->total);
taosTmrReset
(
taosCleanConnCache
,
pObj
->
keepTimer
*
2
,
pObj
,
pObj
->
tmrCtrl
,
&
pObj
->
pTimer
);
}
void
*
taosGetConnFromCache
(
void
*
handle
,
uint32_t
ip
,
uint16_t
port
,
char
*
user
)
{
int
hash
;
SConnHash
*
pNode
;
SConnCache
*
pObj
;
void
*
pData
=
NULL
;
pObj
=
(
SConnCache
*
)
handle
;
if
(
pObj
==
NULL
||
pObj
->
maxSessions
==
0
)
return
NULL
;
uint64_t
time
=
taosGetTimestampMs
();
hash
=
taosHashConn
(
pObj
,
ip
,
port
,
user
);
pthread_mutex_lock
(
&
pObj
->
mutex
);
pNode
=
pObj
->
connHashList
[
hash
];
while
(
pNode
)
{
if
(
time
>=
pObj
->
keepTimer
+
pNode
->
time
)
{
taosRemoveExpiredNodes
(
pObj
,
pNode
,
hash
,
time
);
pNode
=
NULL
;
break
;
}
if
(
pNode
->
ip
==
ip
&&
pNode
->
port
==
port
)
break
;
pNode
=
pNode
->
next
;
}
if
(
pNode
)
{
taosRemoveExpiredNodes
(
pObj
,
pNode
->
next
,
hash
,
time
);
if
(
pNode
->
prev
)
{
pNode
->
prev
->
next
=
pNode
->
next
;
}
else
{
pObj
->
connHashList
[
hash
]
=
pNode
->
next
;
}
if
(
pNode
->
next
)
{
pNode
->
next
->
prev
=
pNode
->
prev
;
}
pData
=
pNode
->
data
;
taosMemPoolFree
(
pObj
->
connHashMemPool
,
(
char
*
)
pNode
);
pObj
->
total
--
;
pObj
->
count
[
hash
]
--
;
}
pthread_mutex_unlock
(
&
pObj
->
mutex
);
if
(
pData
)
{
tscTrace
(
"%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d"
,
pData
,
ip
,
port
,
hash
,
pNode
,
pObj
->
count
[
hash
]);
}
return
pData
;
}
void
*
taosOpenConnCache
(
int
maxSessions
,
void
(
*
cleanFp
)(
void
*
),
void
*
tmrCtrl
,
int64_t
keepTimer
)
{
SConnHash
**
connHashList
;
mpool_h
connHashMemPool
;
SConnCache
*
pObj
;
connHashMemPool
=
taosMemPoolInit
(
maxSessions
,
sizeof
(
SConnHash
));
if
(
connHashMemPool
==
0
)
return
NULL
;
connHashList
=
calloc
(
sizeof
(
SConnHash
*
),
maxSessions
);
if
(
connHashList
==
0
)
{
taosMemPoolCleanUp
(
connHashMemPool
);
return
NULL
;
}
pObj
=
malloc
(
sizeof
(
SConnCache
));
if
(
pObj
==
NULL
)
{
taosMemPoolCleanUp
(
connHashMemPool
);
free
(
connHashList
);
return
NULL
;
}
memset
(
pObj
,
0
,
sizeof
(
SConnCache
));
pObj
->
count
=
calloc
(
sizeof
(
int
),
maxSessions
);
pObj
->
total
=
0
;
pObj
->
keepTimer
=
keepTimer
;
pObj
->
maxSessions
=
maxSessions
;
pObj
->
connHashMemPool
=
connHashMemPool
;
pObj
->
connHashList
=
connHashList
;
pObj
->
cleanFp
=
cleanFp
;
pObj
->
tmrCtrl
=
tmrCtrl
;
taosTmrReset
(
taosCleanConnCache
,
pObj
->
keepTimer
*
2
,
pObj
,
pObj
->
tmrCtrl
,
&
pObj
->
pTimer
);
pthread_mutex_init
(
&
pObj
->
mutex
,
NULL
);
return
pObj
;
}
void
taosCloseConnCache
(
void
*
handle
)
{
SConnCache
*
pObj
;
pObj
=
(
SConnCache
*
)
handle
;
if
(
pObj
==
NULL
||
pObj
->
maxSessions
==
0
)
return
;
pthread_mutex_lock
(
&
pObj
->
mutex
);
taosTmrStopA
(
&
(
pObj
->
pTimer
));
if
(
pObj
->
connHashMemPool
)
taosMemPoolCleanUp
(
pObj
->
connHashMemPool
);
tfree
(
pObj
->
connHashList
);
tfree
(
pObj
->
count
)
pthread_mutex_unlock
(
&
pObj
->
mutex
);
pthread_mutex_destroy
(
&
pObj
->
mutex
);
memset
(
pObj
,
0
,
sizeof
(
SConnCache
));
free
(
pObj
);
}
src/client/src/tscFunctionImpl.c
浏览文件 @
80217813
...
...
@@ -14,20 +14,20 @@
*/
#include "os.h"
#include "qextbuffer.h"
#include "qhistogram.h"
#include "qinterpolation.h"
#include "qpercentile.h"
#include "qsyntaxtreefunction.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tast.h"
#include "textbuffer.h"
#include "thistogram.h"
#include "tinterpolation.h"
#include "tlog.h"
#include "tscJoinProcess.h"
#include "tscSyntaxtreefunction.h"
#include "tscompression.h"
#include "tsqlfunction.h"
#include "ttime.h"
#include "taosdef.h"
#include "tutil.h"
#include "tpercentile.h"
#define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes))
#define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes)
...
...
src/client/src/tscLocal.c
浏览文件 @
80217813
...
...
@@ -21,7 +21,7 @@
#include "tsclient.h"
#include "taosdef.h"
#include "
t
extbuffer.h"
#include "
q
extbuffer.h"
#include "tscSecondaryMerge.h"
#include "tschemautil.h"
#include "tsocket.h"
...
...
src/client/src/tscParseInsert.c
浏览文件 @
80217813
...
...
@@ -1314,11 +1314,10 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) {
* the error handle callback function can rightfully restore the user defined function (fp)
*/
if
(
pSql
->
fp
!=
NULL
&&
multiVnodeInsertion
)
{
assert
(
pSql
->
fetchFp
==
NULL
);
pSql
->
fetchFp
=
pSql
->
fp
;
// replace user defined callback function with multi-insert proxy function
pSql
->
fp
=
tscAsyncInsertMultiVnodesProxy
;
pSql
->
fp
=
launchMultivnodeInsert
;
}
ret
=
tsParseInsertSql
(
pSql
);
...
...
src/client/src/tscSQLParser.c
浏览文件 @
80217813
...
...
@@ -28,6 +28,7 @@
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttokendef.h"
#define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0"
...
...
@@ -59,7 +60,7 @@ static int32_t setObjFullName(char* fullName, const char* account, SSQLToken* pD
static
void
getColumnName
(
tSQLExprItem
*
pItem
,
char
*
resultFieldName
,
int32_t
nameLength
);
static
void
getRevisedName
(
char
*
resultFieldName
,
int32_t
functionId
,
int32_t
maxLen
,
char
*
columnName
);
static
int32_t
addExprAndResultField
(
SQueryInfo
*
pQueryInfo
,
int32_t
colIdx
,
tSQLExprItem
*
pItem
,
bool
isResultColumn
);
static
int32_t
addExprAndResultField
(
SQueryInfo
*
pQueryInfo
,
int32_t
colIdx
,
tSQLExprItem
*
pItem
,
bool
finalResult
);
static
int32_t
insertResultField
(
SQueryInfo
*
pQueryInfo
,
int32_t
outputIndex
,
SColumnList
*
pIdList
,
int16_t
bytes
,
int8_t
type
,
char
*
fieldName
,
SSqlExpr
*
pSqlExpr
);
static
int32_t
changeFunctionID
(
int32_t
optr
,
int16_t
*
functionId
);
...
...
src/client/src/tscServer.c
浏览文件 @
80217813
...
...
@@ -615,21 +615,13 @@ int tscProcessSql(SSqlObj *pSql) {
* when pSql being released, pSql->fp == NULL, it may pass the check of pSql->fp == NULL,
* which causes deadlock. So we keep it as local variable.
*/
void
*
fp
=
pSql
->
fp
;
if
(
tscLaunchSTableSubqueries
(
pSql
)
!=
TSDB_CODE_SUCCESS
)
{
return
pRes
->
code
;
}
if
(
fp
==
NULL
)
{
tsem_post
(
&
pSql
->
emptyRspSem
);
tsem_wait
(
&
pSql
->
rspSem
);
tsem_post
(
&
pSql
->
emptyRspSem
);
// set the command flag must be after the semaphore been correctly set.
pSql
->
cmd
.
command
=
TSDB_SQL_RETRIEVE_METRIC
;
}
return
pSql
->
res
.
code
;
}
else
if
(
pSql
->
fp
==
launchMultivnodeInsert
)
{
// multi-vnodes insertion
launchMultivnodeInsert
(
pSql
);
return
pSql
->
res
.
code
;
}
...
...
src/client/src/tscSql.c
浏览文件 @
80217813
...
...
@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tast.h>
#include "hash.h"
#include "os.h"
#include "tcache.h"
...
...
@@ -30,6 +29,7 @@
#include "tsocket.h"
#include "ttimer.h"
#include "tutil.h"
#include "ttokendef.h"
TAOS
*
taos_connect_imp
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
db
,
uint16_t
port
,
void
(
*
fp
)(
void
*
,
TAOS_RES
*
,
int
),
void
*
param
,
void
**
taos
)
{
...
...
src/client/src/tscSystem.c
浏览文件 @
80217813
...
...
@@ -38,7 +38,6 @@ int initialized = 0;
int
slaveIndex
;
void
*
tscTmr
;
void
*
tscQhandle
;
void
*
tscConnCache
;
void
*
tscCheckDiskUsageTmr
;
int
tsInsertHeadSize
;
...
...
@@ -188,8 +187,6 @@ void taos_init_imp() {
if
(
tscCacheHandle
==
NULL
)
tscCacheHandle
=
taosCacheInit
(
tscTmr
,
refreshTime
);
tscConnCache
=
taosOpenConnCache
(
tsMaxMeterConnections
*
2
,
NULL
/*taosCloseRpcConn*/
,
tscTmr
,
tsShellActivityTimer
*
1000
);
initialized
=
1
;
tscTrace
(
"client is initialized successfully"
);
tsInsertHeadSize
=
tsRpcHeadSize
+
sizeof
(
SShellSubmitMsg
);
...
...
src/client/src/tscUtil.c
浏览文件 @
80217813
...
...
@@ -2105,7 +2105,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
void
tscDoQuery
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
void
*
fp
=
pSql
->
fp
;
pSql
->
res
.
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -2121,7 +2120,6 @@ void tscDoQuery(SSqlObj* pSql) {
}
else
{
// pSql may be released in this function if it is a async insertion.
tscProcessSql
(
pSql
);
if
(
NULL
==
fp
)
tscProcessMultiVnodesInsert
(
pSql
);
}
}
}
...
...
@@ -2321,3 +2319,94 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
tscProcessSql
(
pSql
);
}
}
typedef
struct
SinsertSupporter
{
SSubqueryState
*
pState
;
SSqlObj
*
pSql
;
}
SinsertSupporter
;
void
multiVnodeInsertMerge
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
)
{
SinsertSupporter
*
pSupporter
=
(
SinsertSupporter
*
)
param
;
SSqlObj
*
pParentObj
=
pSupporter
->
pSql
;
SSqlCmd
*
pParentCmd
=
&
pParentObj
->
cmd
;
SSubqueryState
*
pState
=
pSupporter
->
pState
;
int32_t
total
=
pState
->
numOfTotal
;
// increase the total inserted rows
if
(
numOfRows
>
0
)
{
pParentObj
->
res
.
numOfRows
+=
numOfRows
;
}
int32_t
completed
=
atomic_add_fetch_32
(
&
pState
->
numOfCompleted
,
1
);
if
(
completed
<
total
)
{
return
;
}
tscTrace
(
"%p Async insertion completed, total inserted:%d"
,
pParentObj
,
pParentObj
->
res
.
numOfRows
);
// release data block data
pParentCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pParentCmd
->
pDataBlocks
);
// restore user defined fp
pParentObj
->
fp
=
pParentObj
->
fetchFp
;
// all data has been sent to vnode, call user function
(
*
pParentObj
->
fp
)(
pParentObj
->
param
,
tres
,
numOfRows
);
}
int32_t
launchMultivnodeInsert
(
SSqlObj
*
pSql
)
{
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pRes
->
qhandle
=
1
;
// hack the qhandle check
SDataBlockList
*
pDataBlocks
=
pCmd
->
pDataBlocks
;
pSql
->
pSubs
=
calloc
(
pDataBlocks
->
nSize
,
POINTER_BYTES
);
pSql
->
numOfSubs
=
pDataBlocks
->
nSize
;
assert
(
pDataBlocks
->
nSize
>
0
);
tscTrace
(
"%p submit data to %d vnode(s)"
,
pSql
,
pDataBlocks
->
nSize
);
SSubqueryState
*
pState
=
calloc
(
1
,
sizeof
(
SSubqueryState
));
pState
->
numOfTotal
=
pSql
->
numOfSubs
;
pRes
->
code
=
TSDB_CODE_SUCCESS
;
int32_t
i
=
0
;
for
(;
i
<
pSql
->
numOfSubs
;
++
i
)
{
SinsertSupporter
*
pSupporter
=
calloc
(
1
,
sizeof
(
SinsertSupporter
));
pSupporter
->
pSql
=
pSql
;
pSupporter
->
pState
=
pState
;
SSqlObj
*
pNew
=
createSubqueryObj
(
pSql
,
0
,
multiVnodeInsertMerge
,
pSupporter
,
NULL
);
if
(
pNew
==
NULL
)
{
tscError
(
"%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s"
,
pSql
,
i
,
strerror
(
errno
));
break
;
}
pSql
->
pSubs
[
i
]
=
pNew
;
tscTrace
(
"%p sub:%p create subObj success. orderOfSub:%d"
,
pSql
,
pNew
,
i
);
}
if
(
i
<
pSql
->
numOfSubs
)
{
tscError
(
"%p failed to prepare subObj structure and launch sub-insertion"
,
pSql
);
pRes
->
code
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
return
pRes
->
code
;
// free all allocated resource
}
for
(
int32_t
j
=
0
;
j
<
pSql
->
numOfSubs
;
++
j
)
{
SSqlObj
*
pSub
=
pSql
->
pSubs
[
j
];
pSub
->
cmd
.
command
=
TSDB_SQL_INSERT
;
int32_t
code
=
tscCopyDataBlockToPayload
(
pSub
,
pDataBlocks
->
pData
[
j
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscTrace
(
"%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d"
,
pSql
,
j
,
pDataBlocks
->
nSize
,
code
);
}
tscTrace
(
"%p sub:%p launch sub insert, orderOfSub:%d"
,
pSql
,
pSub
,
j
);
tscProcessSql
(
pSub
);
}
return
TSDB_CODE_SUCCESS
;
}
src/inc/
t
percentile.h
→
src/inc/
q
percentile.h
浏览文件 @
80217813
...
...
@@ -16,7 +16,7 @@
#ifndef TDENGINE_TPERCENTILE_H
#define TDENGINE_TPERCENTILE_H
#include "
t
extbuffer.h"
#include "
q
extbuffer.h"
typedef
struct
MinMaxEntry
{
union
{
...
...
src/mnode/src/mgmtTable.c
浏览文件 @
80217813
...
...
@@ -14,22 +14,11 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tast.h"
#include "textbuffer.h"
#include "tschemautil.h"
#include "tscompression.h"
#include "tskiplist.h"
#include "tsqlfunction.h"
#include "tstatus.h"
#include "ttime.h"
#include "mnode.h"
#include "mgmtTable.h"
#include "mgmtAcct.h"
#include "mgmtChildTable.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtGrant.h"
#include "mgmtMnode.h"
...
...
@@ -37,9 +26,20 @@
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
#include "mnode.h"
#include "os.h"
#include "qextbuffer.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tast.h"
#include "tschemautil.h"
#include "tscompression.h"
#include "tskiplist.h"
#include "tsqlfunction.h"
#include "tstatus.h"
#include "ttime.h"
extern
void
*
tsNormalTableSdb
;
extern
void
*
tsChildTableSdb
;
...
...
src/query/inc/
t
extbuffer.h
→
src/query/inc/
q
extbuffer.h
浏览文件 @
80217813
文件已移动
src/query/inc/
t
histogram.h
→
src/query/inc/
q
histogram.h
浏览文件 @
80217813
文件已移动
src/query/inc/
t
interpolation.h
→
src/query/inc/
q
interpolation.h
浏览文件 @
80217813
...
...
@@ -20,6 +20,10 @@
extern
"C"
{
#endif
#include "os.h"
#include "taosdef.h"
#include "qextbuffer.h"
typedef
struct
SInterpolationInfo
{
int64_t
startTimestamp
;
int32_t
order
;
// order [asc/desc]
...
...
src/query/inc/
t
resultBuf.h
→
src/query/inc/
q
resultBuf.h
浏览文件 @
80217813
...
...
@@ -21,7 +21,7 @@ extern "C" {
#endif
#include "os.h"
#include "
t
extbuffer.h"
#include "
q
extbuffer.h"
typedef
struct
SIDList
{
uint32_t
alloc
;
...
...
src/query/inc/qsqlparser.h
浏览文件 @
80217813
...
...
@@ -329,6 +329,18 @@ void tSQLSetColumnType(TAOS_FIELD *pField, SSQLToken *pToken);
void
*
ParseAlloc
(
void
*
(
*
mallocProc
)(
size_t
));
enum
{
TSQL_NODE_TYPE_EXPR
=
0x1
,
TSQL_NODE_TYPE_ID
=
0x2
,
TSQL_NODE_TYPE_VALUE
=
0x4
,
};
#define NON_ARITHMEIC_EXPR 0
#define NORMAL_ARITHMETIC 1
#define AGG_ARIGHTMEIC 2
int32_t
tSQLParse
(
SSqlInfo
*
pSQLInfo
,
const
char
*
pSql
);
#ifdef __cplusplus
}
#endif
...
...
src/query/inc/
tscS
yntaxtreefunction.h
→
src/query/inc/
qs
yntaxtreefunction.h
浏览文件 @
80217813
文件已移动
src/query/src/
tscA
st.c
→
src/query/src/
qa
st.c
浏览文件 @
80217813
...
...
@@ -14,18 +14,17 @@
*/
#include "os.h"
#include "qsqlparser.h"
#include "qsyntaxtreefunction.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tast.h"
#include "tlog.h"
#include "tscSyntaxtreefunction.h"
#include "tschemautil.h"
#include "tsqlfunction.h"
#include "tstoken.h"
#include "ttokendef.h"
#include "taosdef.h"
#include "tutil.h"
#include "qsqlparser.h"
/*
*
...
...
@@ -648,7 +647,7 @@ int32_t intersect(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResults
/*
* traverse the result and apply the function to each item to check if the item is qualified or not
*/
static
void
tSQLListTraverseOnResult
(
struct
tSQLBinaryExpr
*
pExpr
,
__result_filter_fn_t
fp
,
tQueryResultset
*
pResult
)
{
static
UNUSED_FUNC
void
tSQLListTraverseOnResult
(
struct
tSQLBinaryExpr
*
pExpr
,
__result_filter_fn_t
fp
,
tQueryResultset
*
pResult
)
{
assert
(
pExpr
->
pLeft
->
nodeType
==
TSQL_NODE_COL
&&
pExpr
->
pRight
->
nodeType
==
TSQL_NODE_VALUE
);
// brutal force scan the result list and check for each item in the list
...
...
@@ -705,7 +704,7 @@ static bool filterItem(tSQLBinaryExpr *pExpr, const void *pItem, SBinaryFilterSu
* @param pSchema tag schemas
* @param fp filter callback function
*/
static
void
tSQLBinaryTraverseOnResult
(
tSQLBinaryExpr
*
pExpr
,
tQueryResultset
*
pResult
,
SBinaryFilterSupp
*
param
)
{
static
UNUSED_FUNC
void
tSQLBinaryTraverseOnResult
(
tSQLBinaryExpr
*
pExpr
,
tQueryResultset
*
pResult
,
SBinaryFilterSupp
*
param
)
{
int32_t
n
=
0
;
for
(
int32_t
i
=
0
;
i
<
pResult
->
num
;
++
i
)
{
void
*
pItem
=
pResult
->
pRes
[
i
];
...
...
src/query/src/
t
extbuffer.c
→
src/query/src/
q
extbuffer.c
浏览文件 @
80217813
...
...
@@ -12,14 +12,14 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "qextbuffer.h"
#include "os.h"
#include "taos.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "textbuffer.h"
#include "tlog.h"
#include "tsqlfunction.h"
#include "ttime.h"
#include "taosdef.h"
#include "tutil.h"
#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \
...
...
src/query/src/
t
histogram.c
→
src/query/src/
q
histogram.c
浏览文件 @
80217813
...
...
@@ -14,10 +14,10 @@
*/
#include "os.h"
#include "qhistogram.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "thistogram.h"
#include "tlosertree.h"
#include "taosdef.h"
/**
*
...
...
src/query/src/
t
interpolation.c
→
src/query/src/
q
interpolation.c
浏览文件 @
80217813
...
...
@@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "qinterpolation.h"
#include "os.h"
#include "qextbuffer.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "textbuffer.h"
#include "tinterpolation.h"
#include "tsqlfunction.h"
#include "taosdef.h"
#define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSQL_SO_ASC)
...
...
src/query/src/
tscSQLP
arserImpl.c
→
src/query/src/
qp
arserImpl.c
浏览文件 @
80217813
文件已移动
src/query/src/
t
percentile.c
→
src/query/src/
q
percentile.c
浏览文件 @
80217813
...
...
@@ -15,11 +15,10 @@
#include "os.h"
#include "
taosmsg
.h"
#include "
qpercentile
.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tlog.h"
#include "taosdef.h"
#include "tpercentile.h"
tExtMemBuffer
*
releaseBucketsExceptFor
(
tMemBucket
*
pMemBucket
,
int16_t
segIdx
,
int16_t
slotIdx
)
{
tExtMemBuffer
*
pBuffer
=
NULL
;
...
...
src/query/src/
t
resultBuf.c
→
src/query/src/
q
resultBuf.c
浏览文件 @
80217813
#include "qresultBuf.h"
#include "hash.h"
#include "qextbuffer.h"
#include "taoserror.h"
#include "textbuffer.h"
#include "tlog.h"
#include "tsqlfunction.h"
#include "tresultBuf.h"
#define DEFAULT_INTERN_BUF_SIZE 16384L
...
...
src/query/src/
tscS
yntaxtreefunction.c
→
src/query/src/
qs
yntaxtreefunction.c
浏览文件 @
80217813
...
...
@@ -15,7 +15,7 @@
#include "os.h"
#include "
tscS
yntaxtreefunction.h"
#include "
qs
yntaxtreefunction.h"
#include "taosdef.h"
#include "tutil.h"
...
...
src/query/src/
t
tokenizer.c
→
src/query/src/
q
tokenizer.c
浏览文件 @
80217813
文件已移动
src/
query
/inc/tcache.h
→
src/
util
/inc/tcache.h
浏览文件 @
80217813
文件已移动
src/
client
/src/tcache.c
→
src/
util
/src/tcache.c
浏览文件 @
80217813
文件已移动
src/vnode/detail/inc/vnodeRead.h
浏览文件 @
80217813
...
...
@@ -21,9 +21,9 @@ extern "C" {
#endif
#include "os.h"
#include "
t
resultBuf.h"
#include "
q
resultBuf.h"
#include "
t
interpolation.h"
#include "
q
interpolation.h"
#include "vnodeTagMgmt.h"
/*
...
...
src/vnode/detail/src/vnodeQueryImpl.c
浏览文件 @
80217813
...
...
@@ -16,11 +16,11 @@
#include "hash.h"
#include "hashfunc.h"
#include "os.h"
#include "qextbuffer.h"
#include "taosmsg.h"
#include "textbuffer.h"
#include "ttime.h"
#include "
t
interpolation.h"
#include "
q
interpolation.h"
#include "tscJoinProcess.h"
#include "tscSecondaryMerge.h"
#include "tscompression.h"
...
...
src/vnode/detail/src/vnodeQueryProcess.c
浏览文件 @
80217813
...
...
@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "qextbuffer.h"
#include "taosmsg.h"
#include "textbuffer.h"
#include "tscJoinProcess.h"
#include "ttime.h"
#include "vnode.h"
...
...
src/vnode/detail/src/vnodeRead.c
浏览文件 @
80217813
...
...
@@ -19,9 +19,9 @@
#include "hash.h"
#include "hashfunc.h"
#include "ihash.h"
#include "qextbuffer.h"
#include "taosmsg.h"
#include "tast.h"
#include "textbuffer.h"
#include "tscJoinProcess.h"
#include "tscompression.h"
#include "vnode.h"
...
...
src/vnode/detail/src/vnodeSupertableQuery.c
浏览文件 @
80217813
...
...
@@ -14,12 +14,12 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnode.h"
#include "textbuffer.h"
#include "os.h"
#include "qextbuffer.h"
#include "tast.h"
#include "tschemautil.h"
#include "tsqlfunction.h"
#include "tast.h"
//#include "vnodeTagMgmt.h"
typedef
struct
SSyntaxTreeFilterSupporter
{
...
...
src/vnode/detail/src/vnodeTagMgmt.c
浏览文件 @
80217813
...
...
@@ -16,12 +16,12 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "qextbuffer.h"
#include "taosdef.h"
#include "tlog.h"
#include "tutil.h"
#include "taosmsg.h"
#include "textbuffer.h"
#include "tast.h"
#include "tlog.h"
#include "tutil.h"
#include "vnodeTagMgmt.h"
#define GET_TAG_VAL_POINTER(s, col, sc, t) ((t *)(&((s)->tags[getColumnModelOffset(sc, col)])))
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录