Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
爱划水de鲸鱼哥~
TDengine
提交
e15aa9b1
T
TDengine
项目概览
爱划水de鲸鱼哥~
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
e15aa9b1
编写于
12月 31, 2019
作者:
H
hjxilinx
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add the union support in sql parser. #1032. [TBASE-1140]
上级
c18b6b1b
变更
20
展开全部
显示空白变更内容
内联
并排
Showing
20 changed file
with
2593 addition
and
2302 deletion
+2593
-2302
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+39
-34
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+14
-10
src/client/src/sql.c
src/client/src/sql.c
+522
-515
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+41
-23
src/client/src/tscJoinProcess.c
src/client/src/tscJoinProcess.c
+92
-65
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+72
-52
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+18
-18
src/client/src/tscPrepare.c
src/client/src/tscPrepare.c
+2
-2
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+882
-857
src/client/src/tscSecondaryMerge.c
src/client/src/tscSecondaryMerge.c
+196
-169
src/client/src/tscServer.c
src/client/src/tscServer.c
+281
-211
src/client/src/tscSql.c
src/client/src/tscSql.c
+49
-38
src/client/src/tscStream.c
src/client/src/tscStream.c
+63
-53
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+253
-190
src/inc/sql.y
src/inc/sql.y
+2
-1
src/inc/tsdb.h
src/inc/tsdb.h
+1
-0
src/inc/tsqldef.h
src/inc/tsqldef.h
+60
-60
src/inc/tutil.h
src/inc/tutil.h
+2
-2
src/util/src/hash.c
src/util/src/hash.c
+2
-0
src/util/src/tutil.c
src/util/src/tutil.c
+2
-2
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
e15aa9b1
...
...
@@ -29,9 +29,9 @@ extern "C" {
#include "tsclient.h"
#include "tsdb.h"
#define UTIL_METER_IS_
METRIC
(metaInfo) \
#define UTIL_METER_IS_
SUPERTABLE
(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC))
#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_
METRIC
(metaInfo)))
#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_
SUPERTABLE
(metaInfo)))
#define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_MTABLE))
...
...
@@ -95,23 +95,23 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
* @param pSql sql object
* @return
*/
bool
tscIsPointInterpQuery
(
S
SqlCmd
*
pCmd
);
bool
tscIsTWAQuery
(
S
SqlCmd
*
pCmd
);
bool
tscProjectionQueryOnMetric
(
SSqlCmd
*
pCmd
);
bool
tscProjectionQueryOnTable
(
S
SqlCmd
*
pCmd
);
bool
tscIsPointInterpQuery
(
S
QueryInfo
*
pQueryInfo
);
bool
tscIsTWAQuery
(
S
QueryInfo
*
pQueryInfo
);
bool
tscProjectionQueryOnMetric
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
bool
tscProjectionQueryOnTable
(
S
QueryInfo
*
pQueryInfo
);
bool
tscIsTwoStageMergeMetricQuery
(
SSqlCmd
*
pCmd
);
bool
tscQueryOnMetric
(
SSqlCmd
*
pCmd
);
bool
tscQueryMetricTags
(
S
SqlCmd
*
pCmd
);
bool
tscQueryMetricTags
(
S
QueryInfo
*
pQueryInfo
);
bool
tscIsSelectivityWithTagQuery
(
SSqlCmd
*
pCmd
);
void
tscAddSpecialColumnForSelect
(
S
SqlCmd
*
pCmd
,
int32_t
outputColIndex
,
int16_t
functionId
,
SColumnIndex
*
pIndex
,
void
tscAddSpecialColumnForSelect
(
S
QueryInfo
*
pQueryInfo
,
int32_t
outputColIndex
,
int16_t
functionId
,
SColumnIndex
*
pIndex
,
SSchema
*
pColSchema
,
int16_t
isTag
);
void
addRequiredTagColumn
(
S
SqlCmd
*
pCmd
,
int32_t
tagColIndex
,
int32_t
tableIndex
);
void
addRequiredTagColumn
(
S
QueryInfo
*
pQueryInfo
,
int32_t
tagColIndex
,
int32_t
tableIndex
);
int32_t
setMeterID
(
SSqlObj
*
pSql
,
SSQLToken
*
pzTableName
,
int32_t
tableIndex
);
void
tscClearInterpInfo
(
S
SqlCmd
*
pCmd
);
int32_t
setMeterID
(
SSqlObj
*
pSql
,
int32_t
subClauseIndex
,
SSQLToken
*
pzTableName
,
int32_t
tableIndex
);
void
tscClearInterpInfo
(
S
QueryInfo
*
pQueryInfo
);
bool
tscIsInsertOrImportData
(
char
*
sqlstr
);
...
...
@@ -125,30 +125,30 @@ void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIE
void
tscFieldInfoSetValue
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
int8_t
type
,
const
char
*
name
,
int16_t
bytes
);
void
tscFieldInfoUpdateVisible
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
bool
visible
);
void
tscFieldInfoCalOffset
(
S
SqlCmd
*
pCmd
);
void
tscFieldInfoUpdateOffset
(
S
SqlCmd
*
pCmd
);
void
tscFieldInfoCalOffset
(
S
QueryInfo
*
pQueryInfo
);
void
tscFieldInfoUpdateOffset
(
S
QueryInfo
*
pQueryInfo
);
void
tscFieldInfoCopy
(
SFieldInfo
*
src
,
SFieldInfo
*
dst
,
const
int32_t
*
indexList
,
int32_t
size
);
void
tscFieldInfoCopyAll
(
SFieldInfo
*
src
,
SFieldInfo
*
dst
);
TAOS_FIELD
*
tscFieldInfoGetField
(
S
SqlCmd
*
pCmd
,
int32_t
index
);
int16_t
tscFieldInfoGetOffset
(
S
SqlCmd
*
pCmd
,
int32_t
index
);
int32_t
tscGetResRowLength
(
S
SqlCmd
*
pCmd
);
TAOS_FIELD
*
tscFieldInfoGetField
(
S
QueryInfo
*
pQueryInfo
,
int32_t
index
);
int16_t
tscFieldInfoGetOffset
(
S
QueryInfo
*
pQueryInfo
,
int32_t
index
);
int32_t
tscGetResRowLength
(
S
QueryInfo
*
pQueryInfo
);
void
tscClearFieldInfo
(
SFieldInfo
*
pFieldInfo
);
int32_t
tscNumOfFields
(
S
SqlCmd
*
pCmd
);
int32_t
tscNumOfFields
(
S
QueryInfo
*
pQueryInfo
);
void
addExprParams
(
SSqlExpr
*
pExpr
,
char
*
argument
,
int32_t
type
,
int32_t
bytes
,
int16_t
tableIndex
);
SSqlExpr
*
tscSqlExprInsert
(
S
SqlCmd
*
pCmd
,
int32_t
index
,
int16_t
functionId
,
SColumnIndex
*
pColIndex
,
int16_t
type
,
SSqlExpr
*
tscSqlExprInsert
(
S
QueryInfo
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
,
SColumnIndex
*
pColIndex
,
int16_t
type
,
int16_t
size
,
int16_t
interSize
);
SSqlExpr
*
tscSqlExprInsertEmpty
(
S
SqlCmd
*
pCmd
,
int32_t
index
,
int16_t
functionId
);
SSqlExpr
*
tscSqlExprInsertEmpty
(
S
QueryInfo
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
);
SSqlExpr
*
tscSqlExprUpdate
(
S
SqlCmd
*
pCmd
,
int32_t
index
,
int16_t
functionId
,
int16_t
srcColumnIndex
,
int16_t
type
,
SSqlExpr
*
tscSqlExprUpdate
(
S
QueryInfo
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
,
int16_t
srcColumnIndex
,
int16_t
type
,
int16_t
size
);
SSqlExpr
*
tscSqlExprGet
(
S
SqlCmd
*
pCmd
,
int32_t
index
);
SSqlExpr
*
tscSqlExprGet
(
S
QueryInfo
*
pQueryInfo
,
int32_t
index
);
void
tscSqlExprCopy
(
SSqlExprInfo
*
dst
,
const
SSqlExprInfo
*
src
,
uint64_t
uid
);
SColumnBase
*
tscColumnBaseInfoInsert
(
S
SqlCmd
*
pCmd
,
SColumnIndex
*
colIndex
);
SColumnBase
*
tscColumnBaseInfoInsert
(
S
QueryInfo
*
pQueryInfo
,
SColumnIndex
*
colIndex
);
void
tscColumnFilterInfoCopy
(
SColumnFilterInfo
*
dst
,
const
SColumnFilterInfo
*
src
);
void
tscColumnBaseCopy
(
SColumnBase
*
dst
,
const
SColumnBase
*
src
);
...
...
@@ -172,31 +172,36 @@ void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str);
void
tscTagCondCopy
(
STagCond
*
dest
,
const
STagCond
*
src
);
void
tscTagCondRelease
(
STagCond
*
pCond
);
void
tscGetSrcColumnInfo
(
SSrcColumnInfo
*
pColInfo
,
S
SqlCmd
*
pCmd
);
void
tscGetSrcColumnInfo
(
SSrcColumnInfo
*
pColInfo
,
S
QueryInfo
*
pQueryInfo
);
void
tscSetFreeHeatBeat
(
STscObj
*
pObj
);
bool
tscShouldFreeHeatBeat
(
SSqlObj
*
pHb
);
void
tscCleanSqlCmd
(
SSqlCmd
*
pCmd
);
bool
tscShouldFreeAsyncSqlObj
(
SSqlObj
*
pSql
);
void
tscRemoveAllMeterMetaInfo
(
SSqlCmd
*
pCmd
,
bool
removeFromCache
);
SMeterMetaInfo
*
tscGetMeterMetaInfo
(
SSqlCmd
*
pCmd
,
int32_t
index
);
SMeterMetaInfo
*
tscGetMeterMetaInfoByUid
(
SSqlCmd
*
pCmd
,
uint64_t
uid
,
int32_t
*
index
);
void
tscRemoveAllMeterMetaInfo
(
SQueryInfo
*
pQueryInfo
,
const
char
*
address
,
bool
removeFromCache
);
SMeterMetaInfo
*
tscGetMeterMetaInfo
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
,
int32_t
tableIndex
);
SMeterMetaInfo
*
tscGetMeterMetaInfoFromQueryInfo
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
SQueryInfo
*
tscGetQueryInfoDetail
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
SMeterMetaInfo
*
tscGetMeterMetaInfoByUid
(
SQueryInfo
*
pQueryInfo
,
int32_t
subClauseIndex
,
uint64_t
uid
,
int32_t
*
index
);
void
tscClearMeterMetaInfo
(
SMeterMetaInfo
*
pMeterMetaInfo
,
bool
removeFromCache
);
SMeterMetaInfo
*
tscAddMeterMetaInfo
(
SSqlCmd
*
pCmd
,
const
char
*
name
,
SMeterMeta
*
pMeterMeta
,
SMetricMeta
*
pMetricMeta
,
SMeterMetaInfo
*
tscAddMeterMetaInfo
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
,
const
char
*
name
,
SMeterMeta
*
pMeterMeta
,
SMetricMeta
*
pMetricMeta
,
int16_t
numOfTags
,
int16_t
*
tags
);
SMeterMetaInfo
*
tscAddEmptyMeterMetaInfo
(
SSqlCmd
*
pCmd
);
int32_t
tscAddQueryInfo
(
SSqlCmd
*
pCmd
);
SMeterMetaInfo
*
tscAddEmptyMeterMetaInfo
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
int32_t
tscAddSubqueryInfo
(
SSqlCmd
*
pCmd
);
void
tscFreeSubqueryInfo
(
SSqlCmd
*
pCmd
);
void
tscGetMetricMetaCacheKey
(
SSqlCmd
*
pCmd
,
char
*
keyStr
,
uint64_t
uid
);
void
tscGetMetricMetaCacheKey
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
,
char
*
keyStr
,
uint64_t
uid
);
int
tscGetMetricMeta
(
SSqlObj
*
pSql
);
int
tscGetMeterMeta
(
SSqlObj
*
pSql
,
char
*
meterId
,
int32_t
tableIndex
);
int
tscGetMeterMetaEx
(
SSqlObj
*
pSql
,
char
*
meterId
,
bool
createIfNotExists
);
void
tscResetForNextRetrieve
(
SSqlRes
*
pRes
);
void
tscAddTimestampColumn
(
S
SqlCmd
*
pCmd
,
int16_t
functionId
,
int16_t
tableIndex
);
void
tscAddTimestampColumn
(
S
QueryInfo
*
pQueryInfo
,
int16_t
functionId
,
int16_t
tableIndex
);
void
tscDoQuery
(
SSqlObj
*
pSql
);
/**
...
...
@@ -217,9 +222,9 @@ void tscDoQuery(SSqlObj* pSql);
* @return
*/
SSqlObj
*
createSubqueryObj
(
SSqlObj
*
pSql
,
int16_t
tableIndex
,
void
(
*
fp
)(),
void
*
param
,
SSqlObj
*
pPrevSql
);
void
addGroupInfoForSubquery
(
SSqlObj
*
pParentObj
,
SSqlObj
*
pSql
,
int32_t
tableIndex
);
void
addGroupInfoForSubquery
(
SSqlObj
*
pParentObj
,
SSqlObj
*
pSql
,
int32_t
subClauseIndex
,
int32_t
tableIndex
);
void
doAddGroupColumnForSubquery
(
S
SqlCmd
*
pCmd
,
int32_t
tagIndex
);
void
doAddGroupColumnForSubquery
(
S
QueryInfo
*
pQueryInfo
,
int32_t
tagIndex
);
int16_t
tscGetJoinTagColIndexByUid
(
STagCond
*
pTagCond
,
uint64_t
uid
);
...
...
@@ -228,7 +233,7 @@ TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port,
void
sortRemoveDuplicates
(
STableDataBlocks
*
dataBuf
);
void
tscPrintSelectClause
(
SSqlCmd
*
pCmd
);
void
tscPrintSelectClause
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
#ifdef __cplusplus
}
...
...
src/client/inc/tsclient.h
浏览文件 @
e15aa9b1
...
...
@@ -39,9 +39,9 @@ extern "C" {
#include "tsqlfunction.h"
#include "tutil.h"
#define TSC_GET_RESPTR_BASE(res,
cmd
, col, ord) \
((res->data + tscFieldInfoGetOffset(
cmd
, col) * res->numOfRows) + \
(1 - ord.order) * (res->numOfRows - 1) * tscFieldInfoGetField(
cmd
, col)->bytes)
#define TSC_GET_RESPTR_BASE(res,
_queryinfo
, col, ord) \
((res->data + tscFieldInfoGetOffset(
_queryinfo
, col) * res->numOfRows) + \
(1 - ord.order) * (res->numOfRows - 1) * tscFieldInfoGetField(
_queryinfo
, col)->bytes)
// forward declaration
struct
SSqlInfo
;
...
...
@@ -208,6 +208,7 @@ typedef struct SDataBlockList {
}
SDataBlockList
;
typedef
struct
SQueryInfo
{
uint16_t
type
;
// query type
char
intervalTimeUnit
;
int64_t
etime
,
stime
;
...
...
@@ -221,16 +222,18 @@ typedef struct SQueryInfo {
SLimitVal
limit
;
SLimitVal
slimit
;
STagCond
tagCond
;
SOrderVal
order
;
int16_t
interpoType
;
// interpolate type
int16_t
numOfTables
;
SMeterMetaInfo
**
pMeterInfo
;
struct
STSBuf
*
tsBuf
;
// todo use dynamic allocated memory for defaultVal
int64_t
defaultVal
[
TSDB_MAX_COLUMNS
];
// default value for interpolation
char
*
msg
;
// pointer to the pCmd->payload to keep error message temporarily
}
SQueryInfo
;
typedef
struct
{
SOrderVal
order
;
//
SOrderVal order;
int
command
;
int
count
;
// TODO refactor
...
...
@@ -241,7 +244,6 @@ typedef struct {
int8_t
isInsertFromFile
;
// load data from file or not
uint8_t
msgType
;
uint16_t
type
;
// query type
/*
* use to keep short request msg and error msg, in such case, SSqlCmd->payload == SSqlCmd->ext;
...
...
@@ -255,8 +257,8 @@ typedef struct {
short
numOfCols
;
int64_t
globalLimit
;
SQueryInfo
*
pQueryInfo
;
int32_t
numOf
Queries
;
SQueryInfo
*
*
pQueryInfo
;
int32_t
numOf
Clause
;
// char intervalTimeUnit;
// int64_t etime, stime;
...
...
@@ -421,12 +423,12 @@ int taos_retrieve(TAOS_RES *res);
* transfer function for metric query in stream computing, the function need to be change
* before send query message to vnode
*/
int32_t
tscTansformSQLFunctionForMetricQuery
(
S
SqlCmd
*
pCmd
);
void
tscRestoreSQLFunctionForMetricQuery
(
S
SqlCmd
*
pCmd
);
int32_t
tscTansformSQLFunctionForMetricQuery
(
S
QueryInfo
*
pQueryInfo
);
void
tscRestoreSQLFunctionForMetricQuery
(
S
QueryInfo
*
pQueryInfo
);
void
tscClearSqlMetaInfoForce
(
SSqlCmd
*
pCmd
);
int32_t
tscCreateResPointerInfo
(
S
SqlCmd
*
pCmd
,
SSqlRes
*
pRes
);
int32_t
tscCreateResPointerInfo
(
S
QueryInfo
*
pQueryInfo
,
SSqlRes
*
pRes
);
void
tscDestroyResPointerInfo
(
SSqlRes
*
pRes
);
void
tscFreeSqlCmdData
(
SSqlCmd
*
pCmd
);
...
...
@@ -454,6 +456,8 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowL
bool
tscIsUpdateQuery
(
STscObj
*
pObj
);
bool
tscHasReachLimitation
(
SSqlObj
*
pSql
);
char
*
tscGetErrorMsgPayload
(
SSqlCmd
*
pCmd
);
int32_t
tscInvalidSQLErrMsg
(
char
*
msg
,
const
char
*
additionalInfo
,
const
char
*
sql
);
// transfer SSqlInfo to SqlCmd struct
...
...
src/client/src/sql.c
浏览文件 @
e15aa9b1
此差异已折叠。
点击以展开。
src/client/src/tscAsync.c
浏览文件 @
e15aa9b1
...
...
@@ -118,10 +118,12 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
// sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx
if
(
numOfRows
==
0
&&
tscProjectionQueryOnMetric
(
pCmd
))
{
if
(
numOfRows
==
0
&&
tscProjectionQueryOnMetric
(
pCmd
,
0
))
{
// vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
FromQueryInfo
(
pQueryInfo
,
0
);
assert
(
pMeterMetaInfo
->
vnodeIndex
>=
0
);
/* reach the maximum number of output rows, abort */
...
...
@@ -131,8 +133,9 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf
}
/* update the limit value according to current retrieval results */
pCmd
->
pQueryInfo
->
limit
.
limit
=
pCmd
->
globalLimit
-
pRes
->
numOfTotal
;
pCmd
->
pQueryInfo
->
limit
.
offset
=
pRes
->
offset
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
pQueryInfo
->
limit
.
limit
=
pCmd
->
globalLimit
-
pRes
->
numOfTotal
;
pQueryInfo
->
limit
.
offset
=
pRes
->
offset
;
if
((
++
(
pMeterMetaInfo
->
vnodeIndex
))
<
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
tscTrace
(
"%p retrieve data from next vnode:%d"
,
pSql
,
pMeterMetaInfo
->
vnodeIndex
);
...
...
@@ -266,13 +269,14 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
if
(
numOfRows
==
0
)
{
// sequentially retrieve data from remain vnodes.
if
(
tscProjectionQueryOnMetric
(
pCmd
))
{
if
(
tscProjectionQueryOnMetric
(
pCmd
,
0
))
{
/*
* vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx till all vnode have been retrieved
*/
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
,
0
);
assert
(
pMeterMetaInfo
->
vnodeIndex
>=
0
);
/* reach the maximum number of output rows, abort */
...
...
@@ -282,7 +286,8 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) {
}
/* update the limit value according to current retrieval results */
pCmd
->
pQueryInfo
->
limit
.
limit
=
pCmd
->
globalLimit
-
pRes
->
numOfTotal
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
pQueryInfo
->
limit
.
limit
=
pCmd
->
globalLimit
-
pRes
->
numOfTotal
;
if
((
++
pMeterMetaInfo
->
vnodeIndex
)
<=
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
// reset flag to launch query first.
...
...
@@ -297,7 +302,7 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) {
}
}
else
{
for
(
int
i
=
0
;
i
<
pCmd
->
numOfCols
;
++
i
)
pRes
->
tsrow
[
i
]
=
TSC_GET_RESPTR_BASE
(
pRes
,
p
Cmd
,
i
,
pCmd
->
order
)
+
pRes
->
bytes
[
i
]
*
pRes
->
row
;
pRes
->
tsrow
[
i
]
=
TSC_GET_RESPTR_BASE
(
pRes
,
p
QueryInfo
,
i
,
pQueryInfo
->
order
)
+
pRes
->
bytes
[
i
]
*
pRes
->
row
;
pRes
->
row
++
;
(
*
pSql
->
fetchFp
)(
pSql
->
param
,
pSql
,
pSql
->
res
.
tsrow
);
...
...
@@ -308,9 +313,13 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SSqlObj
*
pSql
=
(
SSqlObj
*
)
pMsg
->
ahandle
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
assert
(
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
assert
(
pCmd
->
numOfCols
==
pQueryInfo
->
fieldsInfo
.
numOfOutputCols
);
for
(
int
i
=
0
;
i
<
pCmd
->
numOfCols
;
++
i
)
pRes
->
tsrow
[
i
]
=
TSC_GET_RESPTR_BASE
(
pRes
,
p
Cmd
,
i
,
pCmd
->
order
)
+
pRes
->
bytes
[
i
]
*
pRes
->
row
;
pRes
->
tsrow
[
i
]
=
TSC_GET_RESPTR_BASE
(
pRes
,
p
QueryInfo
,
i
,
pQueryInfo
->
order
)
+
pRes
->
bytes
[
i
]
*
pRes
->
row
;
pRes
->
row
++
;
(
*
pSql
->
fetchFp
)(
pSql
->
param
,
pSql
,
pRes
->
tsrow
);
...
...
@@ -406,8 +415,11 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows)
assert
(
!
pCmd
->
isInsertFromFile
&&
pSql
->
signature
==
pSql
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
assert
(
pCmd
->
pQueryInfo
->
numOfTables
==
1
);
int32_t
index
=
0
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
index
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
index
,
0
);
assert
(
pQueryInfo
->
numOfTables
==
1
);
SDataBlockList
*
pDataBlocks
=
pCmd
->
pDataBlocks
;
if
(
pDataBlocks
==
NULL
||
pMeterMetaInfo
->
vnodeIndex
>=
pDataBlocks
->
nSize
)
{
...
...
@@ -465,9 +477,10 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTrace
(
"%p renew meterMeta successfully, command:%d, code:%d, thandle:%p, retry:%d"
,
pSql
,
pSql
->
cmd
.
command
,
pSql
->
res
.
code
,
pSql
->
thandle
,
pSql
->
retry
);
assert
(
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
)
->
pMeterMeta
==
NULL
);
tscGetMeterMeta
(
pSql
,
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
)
->
name
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
,
0
);
assert
(
pMeterMetaInfo
->
pMeterMeta
==
NULL
);
tscGetMeterMeta
(
pSql
,
pMeterMetaInfo
->
name
,
0
);
code
=
tscSendMsgToServer
(
pSql
);
if
(
code
!=
0
)
{
pRes
->
code
=
code
;
...
...
@@ -486,18 +499,21 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if
(
pSql
->
pStream
==
NULL
)
{
// check if it is a sub-query of metric query first, if true, enter another routine
if
((
pSql
->
cmd
.
type
&
TSDB_QUERY_TYPE_STABLE_SUBQUERY
)
==
TSDB_QUERY_TYPE_STABLE_SUBQUERY
)
{
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
((
pQueryInfo
->
type
&
TSDB_QUERY_TYPE_STABLE_SUBQUERY
)
==
TSDB_QUERY_TYPE_STABLE_SUBQUERY
)
{
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
assert
(
pMeterMetaInfo
->
pMeterMeta
->
numOfTags
!=
0
&&
pMeterMetaInfo
->
vnodeIndex
>=
0
&&
pSql
->
param
!=
NULL
);
SRetrieveSupport
*
trs
=
(
SRetrieveSupport
*
)
pSql
->
param
;
SSqlObj
*
pParObj
=
trs
->
pParentSqlObj
;
assert
(
pParObj
->
signature
==
pParObj
&&
trs
->
subqueryIndex
==
pMeterMetaInfo
->
vnodeIndex
&&
pMeterMetaInfo
->
pMeterMeta
->
numOfTags
!=
0
);
tscTrace
(
"%p get metricMeta during metric query successfully"
,
pSql
);
code
=
tscGetMeterMeta
(
pSql
,
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
)
->
name
,
0
);
code
=
tscGetMeterMeta
(
pSql
,
pMeterMetaInfo
->
name
,
0
);
pRes
->
code
=
code
;
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
return
;
...
...
@@ -512,14 +528,14 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
}
}
else
{
// stream computing
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
,
0
);
code
=
tscGetMeterMeta
(
pSql
,
pMeterMetaInfo
->
name
,
0
);
pRes
->
code
=
code
;
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
return
;
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
if
(
code
==
TSDB_CODE_SUCCESS
&&
UTIL_METER_IS_
METRIC
(
pMeterMetaInfo
))
{
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
,
0
);
if
(
code
==
TSDB_CODE_SUCCESS
&&
UTIL_METER_IS_
SUPERTABLE
(
pMeterMetaInfo
))
{
code
=
tscGetMetricMeta
(
pSql
);
pRes
->
code
=
code
;
...
...
@@ -539,7 +555,9 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
* transfer the sql function for metric query before get meter/metric meta,
* since in callback functions, only tscProcessSql(pStream->pSql) is executed!
*/
tscTansformSQLFunctionForMetricQuery
(
&
pSql
->
cmd
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
tscTansformSQLFunctionForMetricQuery
(
pQueryInfo
);
tscIncStreamExecutionCount
(
pSql
->
pStream
);
}
else
{
tscTrace
(
"%p get meterMeta/metricMeta successfully"
,
pSql
);
...
...
src/client/src/tscJoinProcess.c
浏览文件 @
e15aa9b1
...
...
@@ -53,11 +53,16 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
*
st
=
INT64_MAX
;
*
et
=
INT64_MIN
;
SLimitVal
*
pLimit
=
&
pSql
->
cmd
.
pQueryInfo
->
limit
;
int32_t
order
=
pSql
->
cmd
.
order
.
order
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
pSql
->
pSubs
[
0
]
->
cmd
.
pQueryInfo
->
tsBuf
=
output1
;
pSql
->
pSubs
[
1
]
->
cmd
.
pQueryInfo
->
tsBuf
=
output2
;
SLimitVal
*
pLimit
=
&
pQueryInfo
->
limit
;
int32_t
order
=
pQueryInfo
->
order
.
order
;
SQueryInfo
*
pSubQueryInfo1
=
tscGetQueryInfoDetail
(
&
pSql
->
pSubs
[
0
]
->
cmd
,
0
);
SQueryInfo
*
pSubQueryInfo2
=
tscGetQueryInfoDetail
(
&
pSql
->
pSubs
[
1
]
->
cmd
,
0
);
pSubQueryInfo1
->
tsBuf
=
output1
;
pSubQueryInfo2
->
tsBuf
=
output2
;
tsBufResetPos
(
pSupporter1
->
pTSBuf
);
tsBufResetPos
(
pSupporter2
->
pTSBuf
);
...
...
@@ -113,7 +118,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
}
// in case of stable query, limit/offset is not applied here
if
(
pLimit
->
offset
==
0
||
pSql
->
cmd
.
pQueryInfo
->
nAggTimeInterval
>
0
||
QUERY_IS_STABLE_QUERY
(
pSql
->
cmd
.
type
))
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
(
pLimit
->
offset
==
0
||
pQueryInfo
->
nAggTimeInterval
>
0
||
QUERY_IS_STABLE_QUERY
(
pQueryInfo
->
type
))
{
tsBufAppend
(
output1
,
elem1
.
vnode
,
elem1
.
tag
,
(
const
char
*
)
&
elem1
.
ts
,
sizeof
(
elem1
.
ts
));
tsBufAppend
(
output2
,
elem2
.
vnode
,
elem2
.
tag
,
(
const
char
*
)
&
elem2
.
ts
,
sizeof
(
elem2
.
ts
));
}
else
{
...
...
@@ -168,10 +175,12 @@ SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pS
pSupporter
->
pState
=
pState
;
pSupporter
->
subqueryIndex
=
index
;
pSupporter
->
interval
=
pSql
->
cmd
.
pQueryInfo
->
nAggTimeInterval
;
pSupporter
->
limit
=
pSql
->
cmd
.
pQueryInfo
->
limit
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
pSupporter
->
interval
=
pQueryInfo
->
nAggTimeInterval
;
pSupporter
->
limit
=
pQueryInfo
->
limit
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
index
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
,
index
);
pSupporter
->
uid
=
pMeterMetaInfo
->
pMeterMeta
->
uid
;
getTmpfilePath
(
"join-"
,
pSupporter
->
path
);
...
...
@@ -209,10 +218,9 @@ void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) {
* primary timestamp column , the secondary query is not necessary
*
*/
bool
needSecondaryQuery
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
for
(
int32_t
i
=
0
;
i
<
pCmd
->
pQueryInfo
->
colList
.
numOfCols
;
++
i
)
{
SColumnBase
*
pBase
=
tscColumnBaseInfoGet
(
&
pCmd
->
pQueryInfo
->
colList
,
i
);
bool
needSecondaryQuery
(
SQueryInfo
*
pQueryInfo
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
colList
.
numOfCols
;
++
i
)
{
SColumnBase
*
pBase
=
tscColumnBaseInfoGet
(
&
pQueryInfo
->
colList
,
i
);
if
(
pBase
->
colIndex
.
columnIndex
!=
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
return
true
;
}
...
...
@@ -272,53 +280,58 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
tscFreeSqlCmdData
(
&
pNew
->
cmd
);
pSql
->
pSubs
[
j
++
]
=
pNew
;
pNew
->
cmd
.
pQueryInfo
->
tsBuf
=
pSub
->
cmd
.
pQueryInfo
->
tsBuf
;
pSub
->
cmd
.
pQueryInfo
->
tsBuf
=
NULL
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pNew
->
cmd
,
0
);
SQueryInfo
*
pSubQueryInfo
=
tscGetQueryInfoDetail
(
&
pSub
->
cmd
,
0
);
pQueryInfo
->
tsBuf
=
pSubQueryInfo
->
tsBuf
;
pSubQueryInfo
->
tsBuf
=
NULL
;
taos_free_result
(
pSub
);
// set the second stage sub query for join process
p
New
->
cmd
.
type
|=
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
;
p
QueryInfo
->
type
|=
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
;
p
New
->
cmd
.
p
QueryInfo
->
nAggTimeInterval
=
pSupporter
->
interval
;
p
New
->
cmd
.
p
QueryInfo
->
groupbyExpr
=
pSupporter
->
groupbyExpr
;
pQueryInfo
->
nAggTimeInterval
=
pSupporter
->
interval
;
pQueryInfo
->
groupbyExpr
=
pSupporter
->
groupbyExpr
;
tscColumnBaseInfoCopy
(
&
p
New
->
cmd
.
p
QueryInfo
->
colList
,
&
pSupporter
->
colList
,
0
);
tscTagCondCopy
(
&
p
New
->
cmd
.
p
QueryInfo
->
tagCond
,
&
pSupporter
->
tagCond
);
tscColumnBaseInfoCopy
(
&
pQueryInfo
->
colList
,
&
pSupporter
->
colList
,
0
);
tscTagCondCopy
(
&
pQueryInfo
->
tagCond
,
&
pSupporter
->
tagCond
);
tscSqlExprCopy
(
&
p
New
->
cmd
.
p
QueryInfo
->
exprsInfo
,
&
pSupporter
->
exprsInfo
,
pSupporter
->
uid
);
tscFieldInfoCopyAll
(
&
pSupporter
->
fieldsInfo
,
&
p
New
->
cmd
.
p
QueryInfo
->
fieldsInfo
);
tscSqlExprCopy
(
&
pQueryInfo
->
exprsInfo
,
&
pSupporter
->
exprsInfo
,
pSupporter
->
uid
);
tscFieldInfoCopyAll
(
&
pSupporter
->
fieldsInfo
,
&
pQueryInfo
->
fieldsInfo
);
// add the ts function for interval query if it is missing
if
(
pSupporter
->
exprsInfo
.
pExprs
[
0
].
functionId
!=
TSDB_FUNC_TS
&&
p
New
->
cmd
.
p
QueryInfo
->
nAggTimeInterval
>
0
)
{
tscAddTimestampColumn
(
&
pNew
->
cmd
,
TSDB_FUNC_TS
,
0
);
if
(
pSupporter
->
exprsInfo
.
pExprs
[
0
].
functionId
!=
TSDB_FUNC_TS
&&
pQueryInfo
->
nAggTimeInterval
>
0
)
{
tscAddTimestampColumn
(
pQueryInfo
,
TSDB_FUNC_TS
,
0
);
}
// todo refactor function name
tscAddTimestampColumn
(
&
pNew
->
cmd
,
TSDB_FUNC_TS
,
0
);
tscFieldInfoCalOffset
(
&
pNew
->
cmd
);
SQueryInfo
*
pNewQueryInfo
=
tscGetQueryInfoDetail
(
&
pNew
->
cmd
,
0
);
tscAddTimestampColumn
(
pQueryInfo
,
TSDB_FUNC_TS
,
0
);
tscFieldInfoCalOffset
(
pNewQueryInfo
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pNew
->
cmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
FromQueryInfo
(
pNewQueryInfo
,
0
);
/*
* When handling the projection query, the offset value will be modified for table-table join, which is changed
* during the timestamp intersection.
*/
pSupporter
->
limit
=
p
Sql
->
cmd
.
p
QueryInfo
->
limit
;
pNew
->
cmd
.
p
QueryInfo
->
limit
=
pSupporter
->
limit
;
pSupporter
->
limit
=
pQueryInfo
->
limit
;
pNewQueryInfo
->
limit
=
pSupporter
->
limit
;
// fetch the join tag column
if
(
UTIL_METER_IS_
METRIC
(
pMeterMetaInfo
))
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
&
pNew
->
cmd
,
0
);
assert
(
p
New
->
cmd
.
p
QueryInfo
->
tagCond
.
joinInfo
.
hasJoin
);
if
(
UTIL_METER_IS_
SUPERTABLE
(
pMeterMetaInfo
))
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pNewQueryInfo
,
0
);
assert
(
pQueryInfo
->
tagCond
.
joinInfo
.
hasJoin
);
int16_t
tagColIndex
=
tscGetJoinTagColIndexByUid
(
&
p
New
->
cmd
.
p
QueryInfo
->
tagCond
,
pMeterMetaInfo
->
pMeterMeta
->
uid
);
int16_t
tagColIndex
=
tscGetJoinTagColIndexByUid
(
&
pQueryInfo
->
tagCond
,
pMeterMetaInfo
->
pMeterMeta
->
uid
);
pExpr
->
param
[
0
].
i64Key
=
tagColIndex
;
pExpr
->
numOfParams
=
1
;
}
#ifdef _DEBUG_VIEW
tscPrintSelectClause
(
&
pNew
->
cmd
);
tscPrintSelectClause
(
&
pNew
->
cmd
,
0
);
#endif
tscProcessSql
(
pNew
);
...
...
@@ -370,10 +383,12 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter
// update the query time range according to the join results on timestamp
static
void
updateQueryTimeRange
(
SSqlObj
*
pSql
,
int64_t
st
,
int64_t
et
)
{
assert
(
pSql
->
cmd
.
pQueryInfo
->
stime
<=
st
&&
pSql
->
cmd
.
pQueryInfo
->
etime
>=
et
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
pSql
->
cmd
.
pQueryInfo
->
stime
=
st
;
pSql
->
cmd
.
pQueryInfo
->
etime
=
et
;
assert
(
pQueryInfo
->
stime
<=
st
&&
pQueryInfo
->
etime
>=
et
);
pQueryInfo
->
stime
=
st
;
pQueryInfo
->
etime
=
et
;
}
static
void
joinRetrieveCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
)
{
...
...
@@ -381,8 +396,9 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SSqlObj
*
pParentSql
=
pSupporter
->
pObj
;
SSqlObj
*
pSql
=
(
SSqlObj
*
)
tres
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
((
p
Sql
->
cmd
.
type
&
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
)
==
0
)
{
if
((
p
QueryInfo
->
type
&
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
)
==
0
)
{
if
(
pSupporter
->
pState
->
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"%p abort query due to other subquery failure. code:%d, global code:%d"
,
pSql
,
numOfRows
,
pSupporter
->
pState
->
code
);
...
...
@@ -408,8 +424,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
tscTrace
(
"%p create tmp file for ts block:%s"
,
pSql
,
pBuf
->
path
);
pSupporter
->
pTSBuf
=
pBuf
;
}
else
{
assert
(
p
Sql
->
cmd
.
p
QueryInfo
->
numOfTables
==
1
);
// for subquery, only one metermetaInfo
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
);
assert
(
pQueryInfo
->
numOfTables
==
1
);
// for subquery, only one metermetaInfo
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
FromQueryInfo
(
pQueryInfo
,
0
);
tsBufMerge
(
pSupporter
->
pTSBuf
,
pBuf
,
pMeterMetaInfo
->
vnodeIndex
);
tsBufDestory
(
pBuf
);
...
...
@@ -422,9 +438,11 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
taos_fetch_rows_a
(
tres
,
joinRetrieveCallback
,
param
);
}
else
if
(
numOfRows
==
0
)
{
// no data from this vnode anymore
if
(
tscProjectionQueryOnMetric
(
&
pParentSql
->
cmd
))
{
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
);
assert
(
pSql
->
cmd
.
pQueryInfo
->
numOfTables
==
1
);
if
(
tscProjectionQueryOnMetric
(
&
pSql
->
cmd
,
0
))
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
assert
(
pQueryInfo
->
numOfTables
==
1
);
// for projection query, need to try next vnode
if
((
++
pMeterMetaInfo
->
vnodeIndex
)
<
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
...
...
@@ -478,9 +496,11 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
pSql
->
res
.
numOfTotal
+=
pSql
->
res
.
numOfRows
;
}
if
(
tscProjectionQueryOnMetric
(
&
pSql
->
cmd
)
&&
numOfRows
==
0
)
{
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
);
assert
(
pSql
->
cmd
.
pQueryInfo
->
numOfTables
==
1
);
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
if
(
tscProjectionQueryOnMetric
(
pCmd
,
0
)
&&
numOfRows
==
0
)
{
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
assert
(
pQueryInfo
->
numOfTables
==
1
);
// for projection query, need to try next vnode if current vnode is exhausted
if
((
++
pMeterMetaInfo
->
vnodeIndex
)
<
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
...
...
@@ -516,13 +536,14 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
assert
(
pSql
->
numOfSubs
>=
1
);
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
SSqlRes
*
pRes
=
&
pSql
->
pSubs
[
i
]
->
res
;
S
SqlCmd
*
pCmd
=
&
pSql
->
pSubs
[
i
]
->
cmd
;
S
QueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
pSubs
[
i
]
->
cmd
,
0
)
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
FromQueryInfo
(
pQueryInfo
,
0
);
if
(
tscProjectionQueryOnMetric
(
pCmd
))
{
if
(
tscProjectionQueryOnMetric
(
&
pSql
->
cmd
,
0
))
{
if
(
pRes
->
row
>=
pRes
->
numOfRows
&&
pMeterMetaInfo
->
vnodeIndex
<
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
&&
(
!
tscHasReachLimitation
(
pSql
->
pSubs
[
i
])))
{
numOfFetch
++
;
...
...
@@ -555,9 +576,11 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
// wait for all subqueries completed
pSupporter
->
pState
->
numOfTotal
=
numOfFetch
;
assert
(
pRes1
->
numOfRows
>=
0
&&
pCmd1
->
pQueryInfo
->
numOfTables
==
1
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd1
,
0
);
assert
(
pRes1
->
numOfRows
>=
0
&&
pQueryInfo
->
numOfTables
==
1
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd1
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
FromQueryInfo
(
pQueryInfo
,
0
);
if
(
pRes1
->
row
>=
pRes1
->
numOfRows
)
{
tscTrace
(
"%p subquery:%p retrieve data from vnode, subquery:%d, vnodeIndex:%d"
,
pSql
,
pSql1
,
...
...
@@ -589,16 +612,17 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
return
;
// the column transfer support struct has been built
}
pRes
->
pColumnIndex
=
calloc
(
1
,
sizeof
(
SColumnIndex
)
*
pCmd
->
pQueryInfo
->
fieldsInfo
.
numOfOutputCols
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
pRes
->
pColumnIndex
=
calloc
(
1
,
sizeof
(
SColumnIndex
)
*
pQueryInfo
->
fieldsInfo
.
numOfOutputCols
);
for
(
int32_t
i
=
0
;
i
<
p
Cmd
->
p
QueryInfo
->
fieldsInfo
.
numOfOutputCols
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
p
Cmd
,
i
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutputCols
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
p
QueryInfo
,
i
);
int32_t
tableIndexOfSub
=
-
1
;
for
(
int32_t
j
=
0
;
j
<
p
Cmd
->
p
QueryInfo
->
numOfTables
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pQueryInfo
->
numOfTables
;
++
j
)
{
SSqlObj
*
pSub
=
pSql
->
pSubs
[
j
];
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSub
->
cmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSub
->
cmd
,
0
,
0
);
if
(
pMeterMetaInfo
->
pMeterMeta
->
uid
==
pExpr
->
uid
)
{
tableIndexOfSub
=
j
;
break
;
...
...
@@ -606,9 +630,10 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
}
SSqlCmd
*
pSubCmd
=
&
pSql
->
pSubs
[
tableIndexOfSub
]
->
cmd
;
SQueryInfo
*
pSubQueryInfo
=
tscGetQueryInfoDetail
(
pSubCmd
,
0
);
for
(
int32_t
k
=
0
;
k
<
pSub
Cmd
->
p
QueryInfo
->
exprsInfo
.
numOfExprs
;
++
k
)
{
SSqlExpr
*
pSubExpr
=
tscSqlExprGet
(
pSub
Cmd
,
k
);
for
(
int32_t
k
=
0
;
k
<
pSubQueryInfo
->
exprsInfo
.
numOfExprs
;
++
k
)
{
SSqlExpr
*
pSubExpr
=
tscSqlExprGet
(
pSub
QueryInfo
,
k
);
if
(
pExpr
->
functionId
==
pSubExpr
->
functionId
&&
pExpr
->
colInfo
.
colId
==
pSubExpr
->
colInfo
.
colId
)
{
pRes
->
pColumnIndex
[
i
]
=
(
SColumnIndex
){.
tableIndex
=
tableIndexOfSub
,
.
columnIndex
=
k
};
break
;
...
...
@@ -619,7 +644,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
void
tscJoinQueryCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
code
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
tres
;
// SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
// SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0
, 0
);
// int32_t idx = pSql->cmd.vnodeIdx;
...
...
@@ -648,7 +673,8 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
// }
// } else {
if
((
pSql
->
cmd
.
type
&
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
)
!=
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
((
pQueryInfo
->
type
&
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
)
!=
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// direct call joinRetrieveCallback and set the error code
joinRetrieveCallback
(
param
,
pSql
,
code
);
}
else
{
// first stage query, continue to retrieve data
...
...
@@ -678,13 +704,14 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if
(
atomic_add_fetch_32
(
&
pSupporter
->
pState
->
numOfCompleted
,
1
)
>=
pSupporter
->
pState
->
numOfTotal
)
{
tscSetupOutputColumnIndex
(
pParentSql
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
/**
* if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker
*/
if
(
pMeterMetaInfo
->
vnodeIndex
>
0
&&
tscProjectionQueryOnMetric
(
&
pSql
->
cmd
))
{
if
(
pMeterMetaInfo
->
vnodeIndex
>
0
&&
tscProjectionQueryOnMetric
(
&
pSql
->
cmd
,
0
))
{
assert
(
pMeterMetaInfo
->
vnodeIndex
<
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
);
pSupporter
->
pState
->
numOfCompleted
=
0
;
// reset the record value
...
...
src/client/src/tscLocal.c
浏览文件 @
e15aa9b1
...
...
@@ -85,7 +85,7 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type
* length((uint64_t) 123456789011) > 12, greater than sizsof(uint64_t)
*/
static
int32_t
tscMaxLengthOfTagsFields
(
SSqlObj
*
pSql
)
{
SMeterMeta
*
pMeta
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
)
->
pMeterMeta
;
SMeterMeta
*
pMeta
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
,
0
)
->
pMeterMeta
;
if
(
pMeta
->
meterType
==
TSDB_METER_METRIC
||
pMeta
->
meterType
==
TSDB_METER_OTABLE
||
pMeta
->
meterType
==
TSDB_METER_STABLE
)
{
...
...
@@ -114,8 +114,9 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
// one column for each row
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
SMeterMeta
*
pMeta
=
pMeterMetaInfo
->
pMeterMeta
;
/*
...
...
@@ -127,7 +128,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
int32_t
numOfRows
=
pMeta
->
numOfColumns
;
int32_t
totalNumOfRows
=
numOfRows
+
pMeta
->
numOfTags
;
if
(
UTIL_METER_IS_
METRIC
(
pMeterMetaInfo
))
{
if
(
UTIL_METER_IS_
SUPERTABLE
(
pMeterMetaInfo
))
{
numOfRows
=
pMeta
->
numOfColumns
+
pMeta
->
numOfTags
;
}
...
...
@@ -135,31 +136,31 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSchema
*
pSchema
=
tsGetSchema
(
pMeta
);
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
p
Cmd
,
0
);
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
Cmd
,
0
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
pSchema
[
i
].
name
,
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
p
QueryInfo
,
0
);
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
QueryInfo
,
0
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
pSchema
[
i
].
name
,
TSDB_COL_NAME_LEN
);
char
*
type
=
tDataTypeDesc
[
pSchema
[
i
].
type
].
aName
;
pField
=
tscFieldInfoGetField
(
p
Cmd
,
1
);
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
Cmd
,
1
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
type
,
pField
->
bytes
);
pField
=
tscFieldInfoGetField
(
p
QueryInfo
,
1
);
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
QueryInfo
,
1
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
type
,
pField
->
bytes
);
int32_t
bytes
=
pSchema
[
i
].
bytes
;
if
(
pSchema
[
i
].
type
==
TSDB_DATA_TYPE_NCHAR
)
{
bytes
=
bytes
/
TSDB_NCHAR_SIZE
;
}
pField
=
tscFieldInfoGetField
(
p
Cmd
,
2
);
*
(
int32_t
*
)(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
Cmd
,
2
)
*
totalNumOfRows
+
pField
->
bytes
*
i
)
=
bytes
;
pField
=
tscFieldInfoGetField
(
p
QueryInfo
,
2
);
*
(
int32_t
*
)(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
QueryInfo
,
2
)
*
totalNumOfRows
+
pField
->
bytes
*
i
)
=
bytes
;
pField
=
tscFieldInfoGetField
(
p
Cmd
,
3
);
pField
=
tscFieldInfoGetField
(
p
QueryInfo
,
3
);
if
(
i
>=
pMeta
->
numOfColumns
&&
pMeta
->
numOfTags
!=
0
)
{
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
Cmd
,
3
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
"tag"
,
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
QueryInfo
,
3
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
"tag"
,
strlen
(
"tag"
)
+
1
);
}
}
if
(
UTIL_METER_IS_
METRIC
(
pMeterMetaInfo
))
{
if
(
UTIL_METER_IS_
SUPERTABLE
(
pMeterMetaInfo
))
{
return
0
;
}
...
...
@@ -167,27 +168,27 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
char
*
pTagValue
=
tsGetTagsValue
(
pMeta
);
for
(
int32_t
i
=
numOfRows
;
i
<
totalNumOfRows
;
++
i
)
{
// field name
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
p
Cmd
,
0
);
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
Cmd
,
0
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
pSchema
[
i
].
name
,
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
p
QueryInfo
,
0
);
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
QueryInfo
,
0
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
pSchema
[
i
].
name
,
TSDB_COL_NAME_LEN
);
// type name
pField
=
tscFieldInfoGetField
(
p
Cmd
,
1
);
pField
=
tscFieldInfoGetField
(
p
QueryInfo
,
1
);
char
*
type
=
tDataTypeDesc
[
pSchema
[
i
].
type
].
aName
;
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
Cmd
,
1
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
type
,
pField
->
bytes
);
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
QueryInfo
,
1
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
type
,
pField
->
bytes
);
// type length
int32_t
bytes
=
pSchema
[
i
].
bytes
;
pField
=
tscFieldInfoGetField
(
p
Cmd
,
2
);
pField
=
tscFieldInfoGetField
(
p
QueryInfo
,
2
);
if
(
pSchema
[
i
].
type
==
TSDB_DATA_TYPE_NCHAR
)
{
bytes
=
bytes
/
TSDB_NCHAR_SIZE
;
}
*
(
int32_t
*
)(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
Cmd
,
2
)
*
totalNumOfRows
+
pField
->
bytes
*
i
)
=
bytes
;
*
(
int32_t
*
)(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
QueryInfo
,
2
)
*
totalNumOfRows
+
pField
->
bytes
*
i
)
=
bytes
;
// tag value
pField
=
tscFieldInfoGetField
(
p
Cmd
,
3
);
char
*
target
=
pRes
->
data
+
tscFieldInfoGetOffset
(
p
Cmd
,
3
)
*
totalNumOfRows
+
pField
->
bytes
*
i
;
pField
=
tscFieldInfoGetField
(
p
QueryInfo
,
3
);
char
*
target
=
pRes
->
data
+
tscFieldInfoGetOffset
(
p
QueryInfo
,
3
)
*
totalNumOfRows
+
pField
->
bytes
*
i
;
if
(
isNull
(
pTagValue
,
pSchema
[
i
].
type
))
{
sprintf
(
target
,
"%s"
,
TSDB_DATA_NULL_STR
);
...
...
@@ -252,25 +253,28 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pCmd
->
numOfCols
=
numOfCols
;
pCmd
->
order
.
order
=
TSQL_SO_ASC
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
pQueryInfo
->
order
.
order
=
TSQL_SO_ASC
;
tscFieldInfoSetValue
(
&
p
Cmd
->
p
QueryInfo
->
fieldsInfo
,
0
,
TSDB_DATA_TYPE_BINARY
,
"Field"
,
TSDB_COL_NAME_LEN
);
tscFieldInfoSetValue
(
&
pQueryInfo
->
fieldsInfo
,
0
,
TSDB_DATA_TYPE_BINARY
,
"Field"
,
TSDB_COL_NAME_LEN
);
rowLen
+=
TSDB_COL_NAME_LEN
;
tscFieldInfoSetValue
(
&
p
Cmd
->
p
QueryInfo
->
fieldsInfo
,
1
,
TSDB_DATA_TYPE_BINARY
,
"Type"
,
typeColLength
);
tscFieldInfoSetValue
(
&
pQueryInfo
->
fieldsInfo
,
1
,
TSDB_DATA_TYPE_BINARY
,
"Type"
,
typeColLength
);
rowLen
+=
typeColLength
;
tscFieldInfoSetValue
(
&
p
Cmd
->
p
QueryInfo
->
fieldsInfo
,
2
,
TSDB_DATA_TYPE_INT
,
"Length"
,
sizeof
(
int32_t
));
tscFieldInfoSetValue
(
&
pQueryInfo
->
fieldsInfo
,
2
,
TSDB_DATA_TYPE_INT
,
"Length"
,
sizeof
(
int32_t
));
rowLen
+=
sizeof
(
int32_t
);
tscFieldInfoSetValue
(
&
p
Cmd
->
p
QueryInfo
->
fieldsInfo
,
3
,
TSDB_DATA_TYPE_BINARY
,
"Note"
,
noteColLength
);
tscFieldInfoSetValue
(
&
pQueryInfo
->
fieldsInfo
,
3
,
TSDB_DATA_TYPE_BINARY
,
"Note"
,
noteColLength
);
rowLen
+=
noteColLength
;
return
rowLen
;
}
static
int32_t
tscProcessDescribeTable
(
SSqlObj
*
pSql
)
{
assert
(
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
)
->
pMeterMeta
!=
NULL
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
assert
(
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
)
->
pMeterMeta
!=
NULL
);
const
int32_t
NUM_OF_DESCRIBE_TABLE_COLUMNS
=
4
;
const
int32_t
TYPE_COLUMN_LENGTH
=
16
;
...
...
@@ -283,7 +287,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
int32_t
rowLen
=
tscBuildMeterSchemaResultFields
(
pSql
,
NUM_OF_DESCRIBE_TABLE_COLUMNS
,
TYPE_COLUMN_LENGTH
,
note_field_length
);
tscFieldInfoCalOffset
(
&
pSql
->
cmd
);
tscFieldInfoCalOffset
(
pQueryInfo
);
return
tscSetValueToResObj
(
pSql
,
rowLen
);
}
...
...
@@ -293,7 +297,9 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
// only need to reorganize the results in the column format
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
SMetricMeta
*
pMetricMeta
=
pMeterMetaInfo
->
pMetricMeta
;
SSchema
*
pSchema
=
tsGetTagSchema
(
pMeterMetaInfo
->
pMeterMeta
);
...
...
@@ -310,7 +316,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
}
int32_t
totalNumOfResults
=
pMetricMeta
->
numOfMeters
;
int32_t
rowLen
=
tscGetResRowLength
(
p
Cmd
);
int32_t
rowLen
=
tscGetResRowLength
(
p
QueryInfo
);
tscInitResObjForLocalQuery
(
pSql
,
totalNumOfResults
,
rowLen
);
...
...
@@ -321,16 +327,16 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
for
(
int32_t
j
=
0
;
j
<
pSidList
->
numOfSids
;
++
j
)
{
SMeterSidExtInfo
*
pSidExt
=
tscGetMeterSidInfo
(
pSidList
,
j
);
for
(
int32_t
k
=
0
;
k
<
p
Cmd
->
p
QueryInfo
->
fieldsInfo
.
numOfOutputCols
;
++
k
)
{
SColIndexEx
*
pColIndex
=
&
tscSqlExprGet
(
p
Cmd
,
k
)
->
colInfo
;
for
(
int32_t
k
=
0
;
k
<
pQueryInfo
->
fieldsInfo
.
numOfOutputCols
;
++
k
)
{
SColIndexEx
*
pColIndex
=
&
tscSqlExprGet
(
p
QueryInfo
,
k
)
->
colInfo
;
int16_t
offsetId
=
pColIndex
->
colIdx
;
assert
((
pColIndex
->
flag
&
TSDB_COL_TAG
)
!=
0
);
char
*
val
=
pSidExt
->
tags
+
vOffset
[
offsetId
];
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
p
Cmd
,
k
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
p
QueryInfo
,
k
);
memcpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
Cmd
,
k
)
*
totalNumOfResults
+
pField
->
bytes
*
rowIdx
,
val
,
memcpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
QueryInfo
,
k
)
*
totalNumOfResults
+
pField
->
bytes
*
rowIdx
,
val
,
(
size_t
)
pField
->
bytes
);
}
rowIdx
++
;
...
...
@@ -344,21 +350,23 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SMetricMeta
*
pMetricMeta
=
tscGetMeterMetaInfo
(
pCmd
,
0
)
->
pMetricMeta
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SMetricMeta
*
pMetricMeta
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
)
->
pMetricMeta
;
int32_t
totalNumOfResults
=
1
;
// count function only produce one result
int32_t
rowLen
=
tscGetResRowLength
(
p
Cmd
);
int32_t
rowLen
=
tscGetResRowLength
(
p
QueryInfo
);
tscInitResObjForLocalQuery
(
pSql
,
totalNumOfResults
,
rowLen
);
int32_t
rowIdx
=
0
;
for
(
int32_t
i
=
0
;
i
<
totalNumOfResults
;
++
i
)
{
for
(
int32_t
k
=
0
;
k
<
p
Cmd
->
pQueryInfo
[
0
].
fieldsInfo
.
numOfOutputCols
;
++
k
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
p
Cmd
,
i
);
for
(
int32_t
k
=
0
;
k
<
p
QueryInfo
->
fieldsInfo
.
numOfOutputCols
;
++
k
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
p
QueryInfo
,
i
);
if
(
pExpr
->
colInfo
.
colIdx
==
-
1
&&
pExpr
->
functionId
==
TSDB_FUNC_COUNT
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
p
Cmd
,
k
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
p
QueryInfo
,
k
);
memcpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
Cmd
,
i
)
*
totalNumOfResults
+
pField
->
bytes
*
rowIdx
,
memcpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
p
QueryInfo
,
i
)
*
totalNumOfResults
+
pField
->
bytes
*
rowIdx
,
&
pMetricMeta
->
numOfMeters
,
sizeof
(
pMetricMeta
->
numOfMeters
));
}
else
{
tscError
(
"not support operations"
);
...
...
@@ -374,14 +382,16 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) {
static
int
tscProcessQueryTags
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SMeterMeta
*
pMeterMeta
=
tscGetMeterMetaInfo
(
pCmd
,
0
)
->
pMeterMeta
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SMeterMeta
*
pMeterMeta
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
)
->
pMeterMeta
;
if
(
pMeterMeta
==
NULL
||
pMeterMeta
->
numOfTags
==
0
||
pMeterMeta
->
numOfColumns
==
0
)
{
strcpy
(
pCmd
->
payload
,
"invalid table"
);
pSql
->
res
.
code
=
TSDB_CODE_INVALID_TABLE
;
return
pSql
->
res
.
code
;
}
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
p
Cmd
,
0
);
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
p
QueryInfo
,
0
);
if
(
pExpr
->
functionId
==
TSDB_FUNC_COUNT
)
{
return
tscBuildMetricTagSqlFunctionResult
(
pSql
);
}
else
{
...
...
@@ -390,7 +400,9 @@ static int tscProcessQueryTags(SSqlObj *pSql) {
}
static
void
tscProcessCurrentUser
(
SSqlObj
*
pSql
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
0
);
tscSetLocalQueryResult
(
pSql
,
pSql
->
pTscObj
->
user
,
pExpr
->
aliasName
,
TSDB_USER_LEN
);
}
...
...
@@ -403,19 +415,24 @@ static void tscProcessCurrentDB(SSqlObj *pSql) {
setNull
(
db
,
TSDB_DATA_TYPE_BINARY
,
TSDB_DB_NAME_LEN
);
}
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
0
);
tscSetLocalQueryResult
(
pSql
,
db
,
pExpr
->
aliasName
,
TSDB_DB_NAME_LEN
);
}
static
void
tscProcessServerVer
(
SSqlObj
*
pSql
)
{
const
char
*
v
=
pSql
->
pTscObj
->
sversion
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
0
);
tscSetLocalQueryResult
(
pSql
,
v
,
pExpr
->
aliasName
,
tListLen
(
pSql
->
pTscObj
->
sversion
));
}
static
void
tscProcessClientVer
(
SSqlObj
*
pSql
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
0
);
tscSetLocalQueryResult
(
pSql
,
version
,
pExpr
->
aliasName
,
strlen
(
version
));
}
...
...
@@ -433,7 +450,9 @@ static void tscProcessServStatus(SSqlObj *pSql) {
}
}
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
0
);
tscSetLocalQueryResult
(
pSql
,
"1"
,
pExpr
->
aliasName
,
2
);
}
...
...
@@ -442,12 +461,13 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
SSqlRes
*
pRes
=
&
pSql
->
res
;
pCmd
->
numOfCols
=
1
;
pCmd
->
order
.
order
=
TSQL_SO_ASC
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
pQueryInfo
->
order
.
order
=
TSQL_SO_ASC
;
tscFieldInfoSetValue
(
&
p
Cmd
->
pQueryInfo
[
0
].
fieldsInfo
,
0
,
TSDB_DATA_TYPE_BINARY
,
columnName
,
valueLength
);
tscFieldInfoSetValue
(
&
p
QueryInfo
->
fieldsInfo
,
0
,
TSDB_DATA_TYPE_BINARY
,
columnName
,
valueLength
);
tscInitResObjForLocalQuery
(
pSql
,
1
,
valueLength
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
p
Cmd
,
0
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
p
QueryInfo
,
0
);
strncpy
(
pRes
->
data
,
val
,
pField
->
bytes
);
}
...
...
src/client/src/tscParseInsert.c
浏览文件 @
e15aa9b1
...
...
@@ -651,7 +651,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
static
int32_t
doParseInsertStatement
(
SSqlObj
*
pSql
,
void
*
pTableHashList
,
char
**
str
,
SParsedDataColInfo
*
spd
,
int32_t
*
totalNum
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
,
0
);
SMeterMeta
*
pMeterMeta
=
pMeterMetaInfo
->
pMeterMeta
;
STableDataBlocks
*
dataBuf
=
NULL
;
...
...
@@ -707,7 +707,7 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
int32_t
code
=
TSDB_CODE_SUCCESS
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
,
0
);
char
*
sql
=
*
sqlstr
;
// get the token of specified table
...
...
@@ -754,7 +754,7 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
STagData
*
pTag
=
(
STagData
*
)
pCmd
->
payload
;
memset
(
pTag
,
0
,
sizeof
(
STagData
));
setMeterID
(
pSql
,
&
sToken
,
0
);
setMeterID
(
pSql
,
0
,
&
sToken
,
0
);
strncpy
(
pTag
->
name
,
pMeterMetaInfo
->
name
,
TSDB_METER_ID_LEN
);
code
=
tscGetMeterMeta
(
pSql
,
pTag
->
name
,
0
);
...
...
@@ -762,7 +762,7 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
return
code
;
}
if
(
!
UTIL_METER_IS_
METRIC
(
pMeterMetaInfo
))
{
if
(
!
UTIL_METER_IS_
SUPERTABLE
(
pMeterMetaInfo
))
{
return
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"create table only from super table is allowed"
,
sToken
.
z
);
}
...
...
@@ -894,7 +894,7 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
return
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"invalid table name"
,
*
sqlstr
);
}
int32_t
ret
=
setMeterID
(
pSql
,
&
tableToken
,
0
);
int32_t
ret
=
setMeterID
(
pSql
,
0
,
&
tableToken
,
0
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
...
...
@@ -941,13 +941,13 @@ int validateTableName(char *tblName, int len) {
* @param pSql
* @return
*/
int
doParse
r
InsertSql
(
SSqlObj
*
pSql
,
char
*
str
)
{
int
doParseInsertSql
(
SSqlObj
*
pSql
,
char
*
str
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
int32_t
code
=
TSDB_CODE_INVALID_SQL
;
int32_t
totalNum
=
0
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscAddEmptyMeterMetaInfo
(
pCmd
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscAddEmptyMeterMetaInfo
(
pCmd
,
0
);
if
((
code
=
tscAllocPayload
(
pCmd
,
TSDB_PAYLOAD_SIZE
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -992,7 +992,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
}
//TODO refactor
if
((
code
=
setMeterID
(
pSql
,
&
sToken
,
0
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
code
=
setMeterID
(
pSql
,
0
,
&
sToken
,
0
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_error_clean
;
}
...
...
@@ -1011,7 +1011,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
}
}
if
(
UTIL_METER_IS_
METRIC
(
pMeterMetaInfo
))
{
if
(
UTIL_METER_IS_
SUPERTABLE
(
pMeterMetaInfo
))
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"insert data into super table is not supported"
,
NULL
);
goto
_error_clean
;
}
...
...
@@ -1088,7 +1088,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
strcpy
(
pDataBlock
->
filename
,
fname
);
}
else
if
(
sToken
.
type
==
TK_LP
)
{
/* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
SMeterMeta
*
pMeterMeta
=
tscGetMeterMetaInfo
(
pCmd
,
0
)
->
pMeterMeta
;
SMeterMeta
*
pMeterMeta
=
tscGetMeterMetaInfo
(
pCmd
,
0
,
0
)
->
pMeterMeta
;
SSchema
*
pSchema
=
tsGetSchema
(
pMeterMeta
);
if
(
pCmd
->
isInsertFromFile
==
-
1
)
{
...
...
@@ -1188,7 +1188,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
goto
_error_clean
;
}
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
,
0
);
// set the next sent data vnode index in data block arraylist
pMeterMetaInfo
->
vnodeIndex
=
1
;
...
...
@@ -1203,7 +1203,7 @@ _error_clean:
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
_clean:
taosCleanUp
IntHash
(
pSql
->
pTableHashList
);
taosCleanUp
HashTable
(
pSql
->
pTableHashList
);
pSql
->
pTableHashList
=
NULL
;
return
code
;
}
...
...
@@ -1231,14 +1231,14 @@ int tsParseInsertSql(SSqlObj *pSql, char *sql, char *acct, char *db) {
pCmd
->
isInsertFromFile
=
-
1
;
pSql
->
res
.
numOfRows
=
0
;
return
doParse
r
InsertSql
(
pSql
,
sql
+
index
);
return
doParseInsertSql
(
pSql
,
sql
+
index
);
}
int
tsParseSql
(
SSqlObj
*
pSql
,
char
*
acct
,
char
*
db
,
bool
multiVnodeInsertion
)
{
int32_t
ret
=
TSDB_CODE_SUCCESS
;
// must before clean the sqlcmd object
tscRemoveAll
MeterMetaInfo
(
&
pSql
->
cmd
,
false
);
// tscRemove
MeterMetaInfo(&pSql->cmd, false);
if
(
NULL
==
pSql
->
asyncTblPos
)
{
tscCleanSqlCmd
(
&
pSql
->
cmd
);
...
...
@@ -1287,7 +1287,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
int32_t
code
=
TSDB_CODE_SUCCESS
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SMeterMeta
*
pMeterMeta
=
tscGetMeterMetaInfo
(
pCmd
,
0
)
->
pMeterMeta
;
SMeterMeta
*
pMeterMeta
=
tscGetMeterMetaInfo
(
pCmd
,
0
,
0
)
->
pMeterMeta
;
SShellSubmitBlock
*
pBlocks
=
(
SShellSubmitBlock
*
)(
pTableDataBlocks
->
pData
);
tsSetBlockInfo
(
pBlocks
,
pMeterMeta
,
numOfRows
);
...
...
@@ -1319,7 +1319,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
int
numOfRows
=
0
;
int32_t
code
=
0
;
int
nrows
=
0
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
,
0
);
SMeterMeta
*
pMeterMeta
=
pMeterMetaInfo
->
pMeterMeta
;
int32_t
rowSize
=
pMeterMeta
->
rowSize
;
...
...
@@ -1415,7 +1415,7 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql) {
}
STableDataBlocks
*
pDataBlock
=
NULL
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
,
0
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
/* the first block has been sent to server in processSQL function */
...
...
@@ -1450,7 +1450,7 @@ void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql) {
return
;
}
SMeterMetaInfo
*
pInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SMeterMetaInfo
*
pInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
,
0
);
STableDataBlocks
*
pDataBlock
=
NULL
;
int32_t
affected_rows
=
0
;
...
...
src/client/src/tscPrepare.c
浏览文件 @
e15aa9b1
...
...
@@ -409,7 +409,7 @@ static int insertStmtReset(STscStmt* pStmt) {
}
pCmd
->
batchSize
=
0
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
,
0
);
pMeterMetaInfo
->
vnodeIndex
=
0
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -423,7 +423,7 @@ static int insertStmtExecute(STscStmt* stmt) {
++
pCmd
->
batchSize
;
}
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
,
0
);
if
(
pCmd
->
pDataBlocks
->
nSize
>
0
)
{
// merge according to vgid
...
...
src/client/src/tscSQLParser.c
浏览文件 @
e15aa9b1
此差异已折叠。
点击以展开。
src/client/src/tscSecondaryMerge.c
浏览文件 @
e15aa9b1
此差异已折叠。
点击以展开。
src/client/src/tscServer.c
浏览文件 @
e15aa9b1
此差异已折叠。
点击以展开。
src/client/src/tscSql.c
浏览文件 @
e15aa9b1
...
...
@@ -128,8 +128,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
}
pSql
->
cmd
.
command
=
TSDB_SQL_CONNECT
;
int
ret
=
tscAllocPayload
(
&
pSql
->
cmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
);
if
(
TSDB_CODE_SUCCESS
!=
ret
)
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
&
pSql
->
cmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
))
{
globalCode
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
free
(
pSql
);
free
(
pObj
);
...
...
@@ -298,7 +297,8 @@ int taos_num_fields(TAOS_RES *res) {
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
return
0
;
SFieldInfo
*
pFieldsInfo
=
&
pSql
->
cmd
.
pQueryInfo
[
0
].
fieldsInfo
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SFieldInfo
*
pFieldsInfo
=
&
pQueryInfo
->
fieldsInfo
;
return
(
pFieldsInfo
->
numOfOutputCols
-
pFieldsInfo
->
numOfHiddenCols
);
}
...
...
@@ -321,7 +321,8 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
return
0
;
return
pSql
->
cmd
.
pQueryInfo
[
0
].
fieldsInfo
.
pFields
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
return
pQueryInfo
->
fieldsInfo
.
pFields
;
}
int
taos_retrieve
(
TAOS_RES
*
res
)
{
...
...
@@ -370,31 +371,33 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
pRes
->
numOfTotal
+=
pRes
->
numOfRows
;
}
for
(
int
i
=
0
;
i
<
pCmd
->
pQueryInfo
[
0
].
fieldsInfo
.
numOfOutputCols
;
++
i
)
{
pRes
->
tsrow
[
i
]
=
TSC_GET_RESPTR_BASE
(
pRes
,
pCmd
,
i
,
pCmd
->
order
)
+
pRes
->
bytes
[
i
]
*
(
1
-
pCmd
->
order
.
order
)
*
(
pRes
->
numOfRows
-
1
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
for
(
int
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutputCols
;
++
i
)
{
pRes
->
tsrow
[
i
]
=
TSC_GET_RESPTR_BASE
(
pRes
,
pQueryInfo
,
i
,
pQueryInfo
->
order
)
+
pRes
->
bytes
[
i
]
*
(
1
-
pQueryInfo
->
order
.
order
)
*
(
pRes
->
numOfRows
-
1
);
}
*
rows
=
pRes
->
tsrow
;
return
(
p
Cmd
->
order
.
order
==
TSQL_SO_DESC
)
?
pRes
->
numOfRows
:
-
pRes
->
numOfRows
;
return
(
p
QueryInfo
->
order
.
order
==
TSQL_SO_DESC
)
?
pRes
->
numOfRows
:
-
pRes
->
numOfRows
;
}
static
void
**
doSetResultRowData
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
int32_t
num
=
0
;
for
(
int
i
=
0
;
i
<
p
Cmd
->
pQueryInfo
[
0
].
fieldsInfo
.
numOfOutputCols
;
++
i
)
{
pRes
->
tsrow
[
i
]
=
TSC_GET_RESPTR_BASE
(
pRes
,
p
Cmd
,
i
,
pCmd
->
order
)
+
pRes
->
bytes
[
i
]
*
pRes
->
row
;
for
(
int
i
=
0
;
i
<
p
QueryInfo
->
fieldsInfo
.
numOfOutputCols
;
++
i
)
{
pRes
->
tsrow
[
i
]
=
TSC_GET_RESPTR_BASE
(
pRes
,
p
QueryInfo
,
i
,
pQueryInfo
->
order
)
+
pRes
->
bytes
[
i
]
*
pRes
->
row
;
// primary key column cannot be null in interval query, no need to check
if
(
i
==
0
&&
p
Cmd
->
pQueryInfo
[
0
].
nAggTimeInterval
>
0
)
{
if
(
i
==
0
&&
p
QueryInfo
->
nAggTimeInterval
>
0
)
{
continue
;
}
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
p
Cmd
,
i
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
p
QueryInfo
,
i
);
if
(
isNull
(
pRes
->
tsrow
[
i
],
pField
->
type
))
{
pRes
->
tsrow
[
i
]
=
NULL
;
...
...
@@ -419,7 +422,7 @@ static void **doSetResultRowData(SSqlObj *pSql) {
}
}
assert
(
num
<=
p
Cmd
->
pQueryInfo
[
0
].
fieldsInfo
.
numOfOutputCols
);
assert
(
num
<=
p
QueryInfo
->
fieldsInfo
.
numOfOutputCols
);
return
pRes
->
tsrow
;
}
...
...
@@ -437,15 +440,17 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
bool
hasData
=
true
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
if
(
tscProjectionQueryOnMetric
(
pCmd
))
{
if
(
tscProjectionQueryOnMetric
(
pCmd
,
0
))
{
bool
allSubqueryExhausted
=
true
;
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
SSqlRes
*
pRes1
=
&
pSql
->
pSubs
[
i
]
->
res
;
SSqlCmd
*
pCmd1
=
&
pSql
->
pSubs
[
i
]
->
cmd
;
SMeterMetaInfo
*
pMetaInfo
=
tscGetMeterMetaInfo
(
pCmd1
,
0
);
assert
(
pCmd1
->
pQueryInfo
[
0
].
numOfTables
==
1
);
SMeterMetaInfo
*
pMetaInfo
=
tscGetMeterMetaInfo
(
pCmd1
,
0
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
assert
(
pQueryInfo
->
numOfTables
==
1
);
/*
* if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
...
...
@@ -462,10 +467,10 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
}
else
{
// otherwise, in case inner join, if any subquery exhausted, query completed.
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
SSqlRes
*
pRes1
=
&
pSql
->
pSubs
[
i
]
->
res
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
pSubs
[
i
]
->
cmd
,
0
);
if
((
pRes1
->
row
>=
pRes1
->
numOfRows
&&
tscHasReachLimitation
(
pSql
->
pSubs
[
i
])
&&
tscProjectionQueryOnTable
(
&
pSql
->
pSubs
[
i
]
->
cmd
))
||
(
pRes1
->
numOfRows
==
0
))
{
tscProjectionQueryOnTable
(
pQueryInfo
))
||
(
pRes1
->
numOfRows
==
0
))
{
hasData
=
false
;
break
;
...
...
@@ -477,7 +482,6 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
}
static
void
**
tscJoinResultsetFromBuf
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
while
(
1
)
{
...
...
@@ -499,8 +503,10 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
return
NULL
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
(
pRes
->
tsrow
==
NULL
)
{
pRes
->
tsrow
=
malloc
(
POINTER_BYTES
*
p
Cmd
->
pQueryInfo
[
0
].
exprsInfo
.
numOfExprs
);
pRes
->
tsrow
=
malloc
(
POINTER_BYTES
*
p
QueryInfo
->
exprsInfo
.
numOfExprs
);
}
bool
success
=
false
;
...
...
@@ -526,7 +532,7 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
}
if
(
success
)
{
// current row of final output has been built, return to app
for
(
int32_t
i
=
0
;
i
<
p
Cmd
->
pQueryInfo
[
0
].
exprsInfo
.
numOfExprs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
p
QueryInfo
->
exprsInfo
.
numOfExprs
;
++
i
)
{
int32_t
tableIndex
=
pRes
->
pColumnIndex
[
i
].
tableIndex
;
int32_t
columnIndex
=
pRes
->
pColumnIndex
[
i
].
columnIndex
;
...
...
@@ -599,8 +605,11 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
// projection query on metric, pipeline retrieve data from vnode list, instead of two-stage merge
TAOS_ROW
rows
=
taos_fetch_row_impl
(
res
);
while
(
rows
==
NULL
&&
tscProjectionQueryOnMetric
(
pCmd
))
{
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
while
(
rows
==
NULL
&&
tscProjectionQueryOnMetric
(
pCmd
,
0
))
{
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
// reach the maximum number of output rows, abort
if
(
tscHasReachLimitation
(
pSql
))
{
...
...
@@ -611,8 +620,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
* update the limit and offset value according to current retrieval results
* Note: if pRes->offset > 0, pRes->numOfRows = 0, pRes->numOfTotal = 0;
*/
p
Cmd
->
p
QueryInfo
->
limit
.
limit
=
pCmd
->
globalLimit
-
pRes
->
numOfTotal
;
p
Cmd
->
p
QueryInfo
->
limit
.
offset
=
pRes
->
offset
;
pQueryInfo
->
limit
.
limit
=
pCmd
->
globalLimit
-
pRes
->
numOfTotal
;
pQueryInfo
->
limit
.
offset
=
pRes
->
offset
;
assert
((
pRes
->
offset
>=
0
&&
pRes
->
numOfRows
==
0
)
||
(
pRes
->
offset
==
0
&&
pRes
->
numOfRows
>=
0
));
...
...
@@ -656,17 +665,19 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
// projection query on metric, pipeline retrieve data from vnode list,
// instead of two-stage mergevnodeProcessMsgFromShell free qhandle
nRows
=
taos_fetch_block_impl
(
res
,
rows
);
while
(
*
rows
==
NULL
&&
tscProjectionQueryOnMetric
(
pCmd
))
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
while
(
*
rows
==
NULL
&&
tscProjectionQueryOnMetric
(
pCmd
,
0
))
{
/* reach the maximum number of output rows, abort */
if
(
tscHasReachLimitation
(
pSql
))
{
return
0
;
}
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
FromQueryInfo
(
pQueryInfo
,
0
);
/* update the limit value according to current retrieval results */
p
Cmd
->
p
QueryInfo
->
limit
.
limit
=
pSql
->
cmd
.
globalLimit
-
pRes
->
numOfTotal
;
p
Cmd
->
p
QueryInfo
->
limit
.
offset
=
pRes
->
offset
;
pQueryInfo
->
limit
.
limit
=
pSql
->
cmd
.
globalLimit
-
pRes
->
numOfTotal
;
pQueryInfo
->
limit
.
offset
=
pRes
->
offset
;
if
((
++
pMeterMetaInfo
->
vnodeIndex
)
<
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
...
...
@@ -723,9 +734,10 @@ void taos_free_result(TAOS_RES *res) {
}
// set freeFlag to 1 in retrieve message if there are un-retrieved results
pCmd
->
type
=
TSDB_QUERY_TYPE_FREE_RESOURCE
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
pQueryInfo
->
type
=
TSDB_QUERY_TYPE_FREE_RESOURCE
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
FromQueryInfo
(
pQueryInfo
,
0
);
/*
* case 1. Partial data have been retrieved from vnodes, but not all data has been retrieved yet.
...
...
@@ -971,7 +983,6 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
static
int
tscParseTblNameList
(
SSqlObj
*
pSql
,
const
char
*
tblNameList
,
int32_t
tblListLen
)
{
// must before clean the sqlcmd object
tscRemoveAllMeterMetaInfo
(
&
pSql
->
cmd
,
false
);
tscCleanSqlCmd
(
&
pSql
->
cmd
);
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
...
...
@@ -982,7 +993,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
int
code
=
TSDB_CODE_INVALID_METER_ID
;
char
*
str
=
(
char
*
)
tblNameList
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscAddEmptyMeterMetaInfo
(
pCmd
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscAddEmptyMeterMetaInfo
(
pCmd
,
0
);
if
((
code
=
tscAllocPayload
(
pCmd
,
tblListLen
+
16
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -1017,7 +1028,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
return
code
;
}
if
((
code
=
setMeterID
(
pSql
,
&
sToken
,
0
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
code
=
setMeterID
(
pSql
,
0
,
&
sToken
,
0
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
src/client/src/tscStream.c
浏览文件 @
e15aa9b1
...
...
@@ -31,9 +31,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
static
void
tscSetNextLaunchTimer
(
SSqlStream
*
pStream
,
SSqlObj
*
pSql
);
static
void
tscSetRetryTimer
(
SSqlStream
*
pStream
,
SSqlObj
*
pSql
,
int64_t
timer
);
static
bool
isProjectStream
(
S
SqlCmd
*
pCmd
)
{
for
(
int32_t
i
=
0
;
i
<
p
Cmd
->
pQueryInfo
[
0
].
fieldsInfo
.
numOfOutputCols
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
p
Cmd
,
i
);
static
bool
isProjectStream
(
S
QueryInfo
*
pQueryInfo
)
{
for
(
int32_t
i
=
0
;
i
<
p
QueryInfo
->
fieldsInfo
.
numOfOutputCols
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
p
QueryInfo
,
i
);
if
(
pExpr
->
functionId
!=
TSDB_FUNC_PRJ
)
{
return
false
;
}
...
...
@@ -66,21 +66,23 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
pSql
->
fp
=
tscProcessStreamQueryCallback
;
pSql
->
param
=
pStream
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
int
code
=
tscGetMeterMeta
(
pSql
,
pMeterMetaInfo
->
name
,
0
);
pSql
->
res
.
code
=
code
;
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
return
;
if
(
code
==
0
&&
UTIL_METER_IS_
METRIC
(
pMeterMetaInfo
))
{
if
(
code
==
0
&&
UTIL_METER_IS_
SUPERTABLE
(
pMeterMetaInfo
))
{
code
=
tscGetMetricMeta
(
pSql
);
pSql
->
res
.
code
=
code
;
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
return
;
}
tscTansformSQLFunctionForMetricQuery
(
&
pSql
->
cmd
);
tscTansformSQLFunctionForMetricQuery
(
pQueryInfo
);
// failed to get meter/metric meta, retry in 10sec.
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -105,22 +107,23 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
pStream
->
numOfRes
=
0
;
// reset the numOfRes.
SSqlObj
*
pSql
=
pStream
->
pSql
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
tscTrace
(
"%p add into timer"
,
pSql
);
if
(
isProjectStream
(
&
pSql
->
cmd
))
{
if
(
isProjectStream
(
pQueryInfo
))
{
/*
* p
Sql->cmd.pQueryInfo[0].
etime, which is the start time, does not change in case of
* p
QueryInfo->
etime, which is the start time, does not change in case of
* repeat first execution, once the first execution failed.
*/
p
Sql
->
cmd
.
pQueryInfo
[
0
].
stime
=
pStream
->
stime
;
// start time
p
QueryInfo
->
stime
=
pStream
->
stime
;
// start time
p
Sql
->
cmd
.
pQueryInfo
[
0
].
etime
=
taosGetTimestamp
(
pStream
->
precision
);
// end time
if
(
p
Sql
->
cmd
.
pQueryInfo
[
0
].
etime
>
pStream
->
etime
)
{
p
Sql
->
cmd
.
pQueryInfo
[
0
].
etime
=
pStream
->
etime
;
p
QueryInfo
->
etime
=
taosGetTimestamp
(
pStream
->
precision
);
// end time
if
(
p
QueryInfo
->
etime
>
pStream
->
etime
)
{
p
QueryInfo
->
etime
=
pStream
->
etime
;
}
}
else
{
p
Sql
->
cmd
.
pQueryInfo
[
0
].
stime
=
pStream
->
stime
-
pStream
->
interval
;
p
Sql
->
cmd
.
pQueryInfo
[
0
].
etime
=
pStream
->
stime
-
1
;
p
QueryInfo
->
stime
=
pStream
->
stime
-
pStream
->
interval
;
p
QueryInfo
->
etime
=
pStream
->
stime
-
1
;
}
// launch stream computing in a new thread
...
...
@@ -139,7 +142,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tscError
(
"%p stream:%p, query data failed, code:%d, retry in %"
PRId64
"ms"
,
pStream
->
pSql
,
pStream
,
numOfRows
,
retryDelay
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pStream
->
pSql
->
cmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pStream
->
pSql
->
cmd
,
0
,
0
);
tscClearMeterMetaInfo
(
pMeterMetaInfo
,
true
);
tscSetRetryTimer
(
pStream
,
pStream
->
pSql
,
retryDelay
);
...
...
@@ -165,7 +168,7 @@ static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql) {
static
void
tscProcessStreamRetrieveResult
(
void
*
param
,
TAOS_RES
*
res
,
int
numOfRows
)
{
SSqlStream
*
pStream
=
(
SSqlStream
*
)
param
;
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
,
0
);
if
(
pSql
==
NULL
||
numOfRows
<
0
)
{
int64_t
retryDelayTime
=
tscGetRetryDelayTime
(
pStream
->
slidingTime
,
pStream
->
precision
);
...
...
@@ -178,11 +181,12 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
if
(
numOfRows
>
0
)
{
// when reaching here the first execution of stream computing is successful.
pStream
->
numOfRes
+=
numOfRows
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
TAOS_ROW
row
=
taos_fetch_row
(
res
);
tscTrace
(
"%p stream:%p fetch result"
,
pSql
,
pStream
);
if
(
isProjectStream
(
&
pSql
->
cmd
))
{
if
(
isProjectStream
(
pQueryInfo
))
{
pStream
->
stime
=
*
(
TSKEY
*
)
row
[
0
];
}
else
{
tscSetTimestampForRes
(
pStream
,
pSql
);
...
...
@@ -197,9 +201,10 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
}
else
{
// numOfRows == 0, all data has been retrieved
pStream
->
useconds
+=
pSql
->
res
.
useconds
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
(
pStream
->
numOfRes
==
0
)
{
if
(
pSql
->
cmd
.
pQueryInfo
[
0
].
interpoType
==
TSDB_INTERPO_SET_VALUE
||
pSql
->
cmd
.
pQueryInfo
[
0
].
interpoType
==
TSDB_INTERPO_NULL
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
if
(
pQueryInfo
->
interpoType
==
TSDB_INTERPO_SET_VALUE
||
pQueryInfo
->
interpoType
==
TSDB_INTERPO_NULL
)
{
SSqlRes
*
pRes
=
&
pSql
->
res
;
/* failed to retrieve any result in this retrieve */
...
...
@@ -210,11 +215,11 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
void
*
oldPtr
=
pSql
->
res
.
data
;
pSql
->
res
.
data
=
tmpRes
;
for
(
int32_t
i
=
1
;
i
<
p
Sql
->
cmd
.
pQueryInfo
[
0
].
fieldsInfo
.
numOfOutputCols
;
++
i
)
{
int16_t
offset
=
tscFieldInfoGetOffset
(
p
Cmd
,
i
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
p
Cmd
,
i
);
for
(
int32_t
i
=
1
;
i
<
p
QueryInfo
->
fieldsInfo
.
numOfOutputCols
;
++
i
)
{
int16_t
offset
=
tscFieldInfoGetOffset
(
p
QueryInfo
,
i
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
p
QueryInfo
,
i
);
assignVal
(
pSql
->
res
.
data
+
offset
,
(
char
*
)(
&
p
Cmd
->
pQueryInfo
[
0
].
defaultVal
[
i
]),
pField
->
bytes
,
pField
->
type
);
assignVal
(
pSql
->
res
.
data
+
offset
,
(
char
*
)(
&
p
QueryInfo
->
defaultVal
[
i
]),
pField
->
bytes
,
pField
->
type
);
row
[
i
]
=
pSql
->
res
.
data
+
offset
;
}
...
...
@@ -222,7 +227,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
row
[
0
]
=
pRes
->
data
;
// char result[512] = {0};
// taos_print_row(result, row, p
Sql->cmd.pQueryInfo[0].fieldsInfo.pFields, pSql->cmd.pQueryInfo[0].
fieldsInfo.numOfOutputCols);
// taos_print_row(result, row, p
QueryInfo->fieldsInfo.pFields, pQueryInfo->
fieldsInfo.numOfOutputCols);
// tscPrint("%p stream:%p query result: %s", pSql, pStream, result);
tscTrace
(
"%p stream:%p fetch result"
,
pSql
,
pStream
);
...
...
@@ -231,7 +236,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
pRes
->
numOfRows
=
0
;
pRes
->
data
=
oldPtr
;
}
else
if
(
isProjectStream
(
&
pSql
->
cmd
))
{
}
else
if
(
isProjectStream
(
pQueryInfo
))
{
/* no resuls in the query range, retry */
// todo set retry dynamic time
int32_t
retry
=
tsProjectExecInterval
;
...
...
@@ -242,7 +247,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
return
;
}
}
else
{
if
(
isProjectStream
(
&
pSql
->
cmd
))
{
if
(
isProjectStream
(
pQueryInfo
))
{
pStream
->
stime
+=
1
;
}
}
...
...
@@ -257,7 +262,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
}
static
void
tscSetRetryTimer
(
SSqlStream
*
pStream
,
SSqlObj
*
pSql
,
int64_t
timer
)
{
if
(
isProjectStream
(
&
pSql
->
cmd
))
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
(
isProjectStream
(
pQueryInfo
))
{
int64_t
now
=
taosGetTimestamp
(
pStream
->
precision
);
int64_t
etime
=
now
>
pStream
->
etime
?
pStream
->
etime
:
now
;
...
...
@@ -292,7 +299,8 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
static
void
tscSetNextLaunchTimer
(
SSqlStream
*
pStream
,
SSqlObj
*
pSql
)
{
int64_t
timer
=
0
;
if
(
isProjectStream
(
&
pSql
->
cmd
))
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
(
isProjectStream
(
pQueryInfo
))
{
/*
* for project query, no mater fetch data successfully or not, next launch will issue
* more than the sliding time window
...
...
@@ -348,55 +356,56 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
}
static
void
tscSetSlidingWindowInfo
(
SSqlObj
*
pSql
,
SSqlStream
*
pStream
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
int64_t
minIntervalTime
=
(
pStream
->
precision
==
TSDB_TIME_PRECISION_MICRO
)
?
tsMinIntervalTime
*
1000L
:
tsMinIntervalTime
;
if
(
pCmd
->
pQueryInfo
[
0
].
nAggTimeInterval
<
minIntervalTime
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
(
pQueryInfo
->
nAggTimeInterval
<
minIntervalTime
)
{
tscWarn
(
"%p stream:%p, original sample interval:%ld too small, reset to:%"
PRId64
""
,
pSql
,
pStream
,
p
Cmd
->
pQueryInfo
[
0
].
nAggTimeInterval
,
minIntervalTime
);
p
Cmd
->
pQueryInfo
[
0
].
nAggTimeInterval
=
minIntervalTime
;
p
QueryInfo
->
nAggTimeInterval
,
minIntervalTime
);
p
QueryInfo
->
nAggTimeInterval
=
minIntervalTime
;
}
pStream
->
interval
=
p
Cmd
->
pQueryInfo
[
0
].
nAggTimeInterval
;
// it shall be derived from sql string
pStream
->
interval
=
p
QueryInfo
->
nAggTimeInterval
;
// it shall be derived from sql string
if
(
p
Cmd
->
pQueryInfo
[
0
].
nSlidingTime
==
0
)
{
p
Cmd
->
pQueryInfo
[
0
].
nSlidingTime
=
pCmd
->
pQueryInfo
[
0
].
nAggTimeInterval
;
if
(
p
QueryInfo
->
nSlidingTime
==
0
)
{
p
QueryInfo
->
nSlidingTime
=
pQueryInfo
->
nAggTimeInterval
;
}
int64_t
minSlidingTime
=
(
pStream
->
precision
==
TSDB_TIME_PRECISION_MICRO
)
?
tsMinSlidingTime
*
1000L
:
tsMinSlidingTime
;
if
(
p
Cmd
->
pQueryInfo
[
0
].
nSlidingTime
<
minSlidingTime
)
{
if
(
p
QueryInfo
->
nSlidingTime
<
minSlidingTime
)
{
tscWarn
(
"%p stream:%p, original sliding value:%"
PRId64
" too small, reset to:%"
PRId64
""
,
pSql
,
pStream
,
p
Cmd
->
pQueryInfo
[
0
].
nSlidingTime
,
minSlidingTime
);
p
QueryInfo
->
nSlidingTime
,
minSlidingTime
);
p
Cmd
->
pQueryInfo
[
0
].
nSlidingTime
=
minSlidingTime
;
p
QueryInfo
->
nSlidingTime
=
minSlidingTime
;
}
if
(
p
Cmd
->
pQueryInfo
[
0
].
nSlidingTime
>
pCmd
->
pQueryInfo
[
0
].
nAggTimeInterval
)
{
if
(
p
QueryInfo
->
nSlidingTime
>
pQueryInfo
->
nAggTimeInterval
)
{
tscWarn
(
"%p stream:%p, sliding value:%"
PRId64
" can not be larger than interval range, reset to:%"
PRId64
""
,
pSql
,
pStream
,
p
Cmd
->
pQueryInfo
[
0
].
nSlidingTime
,
pCmd
->
pQueryInfo
[
0
].
nAggTimeInterval
);
p
QueryInfo
->
nSlidingTime
,
pQueryInfo
->
nAggTimeInterval
);
p
Cmd
->
pQueryInfo
[
0
].
nSlidingTime
=
pCmd
->
pQueryInfo
[
0
].
nAggTimeInterval
;
p
QueryInfo
->
nSlidingTime
=
pQueryInfo
->
nAggTimeInterval
;
}
pStream
->
slidingTime
=
p
Cmd
->
pQueryInfo
[
0
].
nSlidingTime
;
pStream
->
slidingTime
=
p
QueryInfo
->
nSlidingTime
;
}
static
int64_t
tscGetStreamStartTimestamp
(
SSqlObj
*
pSql
,
SSqlStream
*
pStream
,
int64_t
stime
)
{
S
SqlCmd
*
pCmd
=
&
pSql
->
cmd
;
S
QueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
)
;
if
(
isProjectStream
(
p
Cmd
))
{
if
(
isProjectStream
(
p
QueryInfo
))
{
// no data in table, flush all data till now to destination meter, 10sec delay
pStream
->
interval
=
tsProjectExecInterval
;
pStream
->
slidingTime
=
tsProjectExecInterval
;
if
(
stime
!=
0
)
{
// first projection start from the latest event timestamp
assert
(
stime
>=
p
Cmd
->
pQueryInfo
[
0
].
stime
);
assert
(
stime
>=
p
QueryInfo
->
stime
);
stime
+=
1
;
// exclude the last records from table
}
else
{
stime
=
p
Cmd
->
pQueryInfo
[
0
].
stime
;
stime
=
p
QueryInfo
->
stime
;
}
}
else
{
// timewindow based aggregation stream
if
(
stime
==
0
)
{
// no data in meter till now
...
...
@@ -520,7 +529,8 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
return
NULL
;
}
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
pStream
->
fp
=
fp
;
pStream
->
callback
=
callback
;
...
...
@@ -529,7 +539,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
pStream
->
precision
=
pMeterMetaInfo
->
pMeterMeta
->
precision
;
pStream
->
ctime
=
taosGetTimestamp
(
pStream
->
precision
);
pStream
->
etime
=
p
Cmd
->
pQueryInfo
[
0
].
etime
;
pStream
->
etime
=
p
QueryInfo
->
etime
;
pSql
->
pStream
=
pStream
;
tscAddIntoStreamList
(
pStream
);
...
...
src/client/src/tscUtil.c
浏览文件 @
e15aa9b1
此差异已折叠。
点击以展开。
src/inc/sql.y
浏览文件 @
e15aa9b1
...
...
@@ -354,7 +354,8 @@ select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) fill_
union(Y) ::= select(X). { Y = setSubclause(NULL, X); }
union(Y) ::= LP union(X) RP. { Y = X; }
union(Y) ::= union(Z) UNION select(X). { Y = appendSelectClause(Z, X); }
union(Y) ::= union(Z) UNION ALL select(X). { Y = appendSelectClause(Z, X); }
union(Y) ::= union(Z) UNION ALL LP select(X) RP. { Y = appendSelectClause(Z, X); }
cmd ::= union(X). { setSQLInfo(pInfo, X, NULL, TSDB_SQL_SELECT); }
...
...
src/inc/tsdb.h
浏览文件 @
e15aa9b1
...
...
@@ -186,6 +186,7 @@ extern "C" {
#define TSDB_MAX_TABLES_PER_VNODE 220000
#define TSDB_MAX_JOIN_TABLE_NUM 5
#define TSDB_MAX_UNION_CLAUSE 5
#define TSDB_MAX_BINARY_LEN (TSDB_MAX_BYTES_PER_ROW-TSDB_KEYSIZE)
#define TSDB_MAX_NCHAR_LEN (TSDB_MAX_BYTES_PER_ROW-TSDB_KEYSIZE)
...
...
src/inc/tsqldef.h
浏览文件 @
e15aa9b1
此差异已折叠。
点击以展开。
src/inc/tutil.h
浏览文件 @
e15aa9b1
此差异已折叠。
点击以展开。
src/util/src/hash.c
浏览文件 @
e15aa9b1
此差异已折叠。
点击以展开。
src/util/src/tutil.c
浏览文件 @
e15aa9b1
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录