Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
47c2796b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
47c2796b
编写于
4月 23, 2020
作者:
H
hjxilinx
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-171] refactor client code
上级
59a77a36
变更
14
展开全部
显示空白变更内容
内联
并排
Showing
14 changed file
with
661 addition
and
835 deletion
+661
-835
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+23
-26
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+13
-25
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+9
-8
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+4
-4
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+58
-39
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+3
-3
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+194
-175
src/client/src/tscSecondaryMerge.c
src/client/src/tscSecondaryMerge.c
+59
-45
src/client/src/tscServer.c
src/client/src/tscServer.c
+37
-51
src/client/src/tscSql.c
src/client/src/tscSql.c
+45
-30
src/client/src/tscStream.c
src/client/src/tscStream.c
+4
-4
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+20
-25
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+180
-388
src/inc/taosmsg.h
src/inc/taosmsg.h
+12
-12
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
47c2796b
...
...
@@ -58,7 +58,7 @@ typedef struct SJoinSubquerySupporter {
SLimitVal
limit
;
// limit info
uint64_t
uid
;
// query meter uid
SArray
*
colList
;
// previous query information
S
SqlExprInfo
exprsInfo
;
S
Array
*
exprsInfo
;
SFieldInfo
fieldsInfo
;
STagCond
tagCond
;
SSqlGroupbyExpr
groupbyExpr
;
...
...
@@ -85,8 +85,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
int32_t
startOffset
,
int32_t
rowSize
,
const
char
*
tableId
,
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
);
SVnodeSidList
*
tscGetVnodeSidList
(
SSuperTableMeta
*
pMetricmeta
,
int32_t
vnodeIdx
);
STableIdInfo
*
tscGetMeterSidInfo
(
SVnodeSidList
*
pSidList
,
int32_t
idx
);
UNUSED_FUNC
STableIdInfo
*
tscGetMeterSidInfo
(
SVnodeSidList
*
pSidList
,
int32_t
idx
);
/**
*
...
...
@@ -115,49 +114,49 @@ void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex
void
addRequiredTagColumn
(
SQueryInfo
*
pQueryInfo
,
int32_t
tagColIndex
,
int32_t
tableIndex
);
int32_t
setMeterID
(
STableMetaInfo
*
pTableMetaInfo
,
SSQLToken
*
pzTableName
,
SSqlObj
*
pSql
);
int32_t
tscSetTableId
(
STableMetaInfo
*
pTableMetaInfo
,
SSQLToken
*
pzTableName
,
SSqlObj
*
pSql
);
void
tscClearInterpInfo
(
SQueryInfo
*
pQueryInfo
);
bool
tscIsInsertOrImportData
(
char
*
sqlstr
);
/* use for keep current db info temporarily, for handle table with db prefix */
// todo remove it
void
tscGetDBInfoFromMeterId
(
char
*
tableId
,
char
*
db
);
int
tscAllocPayload
(
SSqlCmd
*
pCmd
,
int
size
);
void
tscFieldInfoSetValFromSchema
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
SSchema
*
pSchema
);
void
tscFieldInfoSetValFromField
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
TAOS_FIELD
*
pField
);
void
tscFieldInfoSetValue
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
int8_t
type
,
const
char
*
name
,
int16_t
bytes
);
void
tscFieldInfoUpdateVisible
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
bool
visible
);
void
tscFieldInfoSetExpr
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
SSqlExpr
*
pExpr
);
void
tscFieldInfoSetBinExpr
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
SSqlFunctionExpr
*
pExpr
);
TAOS_FIELD
tscCreateField
(
int8_t
type
,
const
char
*
name
,
int16_t
bytes
);
SFieldSupInfo
*
tscFieldInfoAppend
(
SFieldInfo
*
pFieldInfo
,
TAOS_FIELD
*
pField
);
SFieldSupInfo
*
tscFieldInfoInsert
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
TAOS_FIELD
*
field
);
SFieldSupInfo
*
tscFieldInfoGetSupp
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
);
TAOS_FIELD
*
tscFieldInfoGetField
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
);
void
tscFieldInfoCalOffset
(
SQueryInfo
*
pQueryInfo
);
void
tscFieldInfoCopy
(
SFieldInfo
*
src
,
SFieldInfo
*
dst
,
const
int32_t
*
indexList
,
int32_t
size
);
void
tscFieldInfoCopyAll
(
SFieldInfo
*
dst
,
SFieldInfo
*
src
);
void
tscFieldInfoUpdateOffsetForInterResult
(
SQueryInfo
*
pQueryInfo
);
TAOS_FIELD
*
tscFieldInfoGetField
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
);
int16_t
tscFieldInfoGetOffset
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
);
int32_t
tscGetResRowLength
(
SQueryInfo
*
pQueryInfo
);
void
tscClearFieldInfo
(
SFieldInfo
*
pFieldInfo
);
void
tscFieldInfoClear
(
SFieldInfo
*
pFieldInfo
);
int32_t
tscNumOfFields
(
SQueryInfo
*
pQueryInfo
);
int32_t
tscFieldInfoCompare
(
SFieldInfo
*
pFieldInfo1
,
SFieldInfo
*
pFieldInfo2
);
int32_t
tscFieldInfoCompare
(
const
SFieldInfo
*
pFieldInfo1
,
const
SFieldInfo
*
pFieldInfo2
);
void
addExprParams
(
SSqlExpr
*
pExpr
,
char
*
argument
,
int32_t
type
,
int32_t
bytes
,
int16_t
tableIndex
);
SSqlExpr
*
tscSqlExprInsert
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
,
SColumnIndex
*
pColIndex
,
int16_t
type
,
int32_t
tscGetResRowLength
(
SArray
*
pExprList
);
SSqlExpr
*
tscSqlExprInsert
(
SQueryInfo
*
pQueryInfo
,
int16_t
functionId
,
SColumnIndex
*
pColIndex
,
int16_t
type
,
int16_t
size
,
int16_t
interSize
);
SSqlExpr
*
tscSqlExpr
InsertEmpty
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
);
SSqlExpr
*
tscSqlExpr
Append
(
SArray
*
exprInfo
,
int16_t
functionId
);
SSqlExpr
*
tscSqlExprUpdate
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
,
int16_t
srcColumnIndex
,
int16_t
type
,
int16_t
size
);
int32_t
tscSqlExprNumOfExprs
(
SQueryInfo
*
pQueryInfo
);
SSqlExpr
*
tscSqlExprGet
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
);
void
tscSqlExprCopy
(
SSqlExprInfo
*
dst
,
const
SSqlExprInfo
*
src
,
uint64_t
uid
,
bool
deepcopy
);
void
*
tscSqlExprDestroy
(
SSqlExpr
*
pExpr
);
void
tscSqlExprInfoDestroy
(
SSqlExprInfo
*
pExprInfo
);
SArray
*
tscSqlExprCopy
(
const
SArray
*
src
,
uint64_t
uid
,
bool
deepcopy
);
void
tscSqlExprInfoDestroy
(
SArray
*
pExprInfo
);
SColumn
*
tscColumnClone
(
const
SColumn
*
src
);
SColumn
*
tscColumnListInsert
(
SArray
*
pColList
,
SColumnIndex
*
colIndex
);
...
...
@@ -184,16 +183,15 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
void
tscSetFreeHeatBeat
(
STscObj
*
pObj
);
bool
tscShouldFreeHeatBeat
(
SSqlObj
*
pHb
);
void
tscCleanSqlCmd
(
SSqlCmd
*
pCmd
);
bool
tscShould
FreeAsyncSqlObj
(
SSqlObj
*
pSql
);
bool
tscShould
BeFreed
(
SSqlObj
*
pSql
);
void
tsc
RemoveAllMeter
MetaInfo
(
SQueryInfo
*
pQueryInfo
,
const
char
*
address
,
bool
removeFromCache
);
void
tsc
ClearAllTable
MetaInfo
(
SQueryInfo
*
pQueryInfo
,
const
char
*
address
,
bool
removeFromCache
);
STableMetaInfo
*
tscGetTableMetaInfoFromCmd
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
,
int32_t
tableIndex
);
STableMetaInfo
*
tscGetMetaInfo
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
SQueryInfo
*
tscGetQueryInfoDetail
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
int32_t
tscGetQueryInfoDetailSafely
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
,
SQueryInfo
**
pQueryInfo
);
STableMetaInfo
*
tscGetMeterMetaInfoByUid
(
SQueryInfo
*
pQueryInfo
,
uint64_t
uid
,
int32_t
*
index
);
void
tscClearMeterMetaInfo
(
STableMetaInfo
*
pTableMetaInfo
,
bool
removeFromCache
);
STableMetaInfo
*
tscAddTableMetaInfo
(
SQueryInfo
*
pQueryInfo
,
const
char
*
name
,
STableMeta
*
pTableMeta
,
...
...
@@ -204,7 +202,6 @@ int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
void
tscFreeSubqueryInfo
(
SSqlCmd
*
pCmd
);
void
tscClearSubqueryInfo
(
SSqlCmd
*
pCmd
);
void
tscGetMetricMetaCacheKey
(
SQueryInfo
*
pQueryInfo
,
char
*
keyStr
,
uint64_t
uid
);
int
tscGetSTableVgroupInfo
(
SSqlObj
*
pSql
,
int32_t
clauseIndex
);
int
tscGetTableMeta
(
SSqlObj
*
pSql
,
STableMetaInfo
*
pTableMetaInfo
);
int
tscGetMeterMetaEx
(
SSqlObj
*
pSql
,
STableMetaInfo
*
pTableMetaInfo
,
bool
createIfNotExists
);
...
...
src/client/inc/tsclient.h
浏览文件 @
47c2796b
...
...
@@ -34,8 +34,6 @@ extern "C" {
#include "tsqlfunction.h"
#include "queryExecutor.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows)
// forward declaration
struct
SSqlInfo
;
...
...
@@ -92,29 +90,18 @@ typedef struct SColumnIndex {
int16_t
columnIndex
;
}
SColumnIndex
;
typedef
struct
SFieldInfo
{
int16_t
numOfOutputCols
;
// number of column in result
int16_t
numOfAlloc
;
// allocated size
TAOS_FIELD
*
pFields
;
typedef
struct
SFieldSupInfo
{
bool
visible
;
SArithExprInfo
*
pArithExprInfo
;
SSqlExpr
*
pSqlExpr
;
}
SFieldSupInfo
;
/*
* define if this column is belong to the queried result, it may be add by parser to faciliate
* the query process
*
* NOTE: these hidden columns always locate at the end of the output columns
*/
bool
*
pVisibleCols
;
int32_t
numOfHiddenCols
;
// the number of column not belongs to the queried result columns
SSqlFunctionExpr
**
pExpr
;
// used for aggregation arithmetic express,such as count(*)+count(*)
SSqlExpr
**
pSqlExpr
;
typedef
struct
SFieldInfo
{
int16_t
numOfOutput
;
// number of column in result
SArray
*
pFields
;
// SArray<TAOS_FIELD>
SArray
*
pSupportInfo
;
// SArray<SFieldSupInfo>
}
SFieldInfo
;
typedef
struct
SSqlExprInfo
{
int16_t
numOfAlloc
;
int16_t
numOfExprs
;
SSqlExpr
**
pExprs
;
}
SSqlExprInfo
;
typedef
struct
SColumn
{
SColumnIndex
colIndex
;
int32_t
numOfFilters
;
...
...
@@ -193,7 +180,7 @@ typedef struct STableDataBlocks {
SParamInfo
*
params
;
}
STableDataBlocks
;
typedef
struct
SDataBlockList
{
typedef
struct
SDataBlockList
{
// todo remove
uint32_t
nSize
;
uint32_t
nAlloc
;
STableDataBlocks
**
pData
;
...
...
@@ -209,9 +196,9 @@ typedef struct SQueryInfo {
int64_t
slidingTime
;
// sliding window in mseconds
SSqlGroupbyExpr
groupbyExpr
;
// group by tags info
SArray
*
colList
;
SArray
*
colList
;
// SArray<SColumn*>
SFieldInfo
fieldsInfo
;
S
SqlExprInfo
exprsInfo
;
S
Array
*
exprsInfo
;
// SArray<SSqlExpr*>
SLimitVal
limit
;
SLimitVal
slimit
;
STagCond
tagCond
;
...
...
@@ -441,6 +428,7 @@ int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *s
void
tscQueueAsyncFreeResult
(
SSqlObj
*
pSql
);
int32_t
tscToSQLCmd
(
SSqlObj
*
pSql
,
struct
SSqlInfo
*
pInfo
);
char
*
tscGetResultColumnChr
(
SSqlRes
*
pRes
,
SQueryInfo
*
pQueryInfo
,
int32_t
column
);
extern
void
*
pVnodeConn
;
extern
void
*
pTscMgmtConn
;
...
...
src/client/src/tscAsync.c
浏览文件 @
47c2796b
...
...
@@ -287,9 +287,9 @@ void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
}
for
(
int
i
=
0
;
i
<
pCmd
->
numOfCols
;
++
i
){
S
SqlExpr
*
pExpr
=
pQueryInfo
->
fieldsInfo
.
pSqlExpr
[
i
]
;
if
(
pExpr
!=
NULL
)
{
pRes
->
tsrow
[
i
]
=
TSC_GET_RESPTR_BASE
(
pRes
,
pQueryInfo
,
i
)
+
p
Expr
->
resBytes
*
pRes
->
row
;
S
FieldSupInfo
*
pSup
=
taosArrayGet
(
pQueryInfo
->
fieldsInfo
.
pSupportInfo
,
i
)
;
if
(
p
Sup
->
pSql
Expr
!=
NULL
)
{
// pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pSup->pSql
Expr->resBytes * pRes->row;
}
else
{
//todo add
}
...
...
@@ -308,11 +308,12 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
for
(
int
i
=
0
;
i
<
pCmd
->
numOfCols
;
++
i
)
{
SSqlExpr
*
pExpr
=
pQueryInfo
->
fieldsInfo
.
pSqlExpr
[
i
];
if
(
pExpr
!=
NULL
)
{
pRes
->
tsrow
[
i
]
=
TSC_GET_RESPTR_BASE
(
pRes
,
pQueryInfo
,
i
)
+
pExpr
->
resBytes
*
pRes
->
row
;
SFieldSupInfo
*
pSup
=
taosArrayGet
(
pQueryInfo
->
fieldsInfo
.
pSupportInfo
,
i
);
if
(
pSup
->
pSqlExpr
!=
NULL
)
{
pRes
->
tsrow
[
i
]
=
tscGetResultColumnChr
(
pRes
,
pQueryInfo
,
i
);
}
else
{
//
todo add
//
todo add
}
}
...
...
@@ -332,7 +333,7 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
int
code
=
pRes
->
code
;
// in case of async insert, restore the user specified callback function
bool
shouldFree
=
tscShould
FreeAsyncSqlObj
(
pSql
);
bool
shouldFree
=
tscShould
BeFreed
(
pSql
);
if
(
cmd
==
TSDB_SQL_INSERT
)
{
assert
(
pSql
->
fp
!=
NULL
);
...
...
src/client/src/tscFunctionImpl.c
浏览文件 @
47c2796b
...
...
@@ -3297,7 +3297,7 @@ static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) {
char
*
arithmetic_callback_function
(
void
*
param
,
char
*
name
,
int32_t
colId
)
{
SArithmeticSupport
*
pSupport
=
(
SArithmeticSupport
*
)
param
;
S
SqlFunctionExpr
*
pExpr
=
pSupport
->
p
Expr
;
S
ArithExprInfo
*
pExpr
=
pSupport
->
pArith
Expr
;
int32_t
colIndex
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
pExpr
->
binExprInfo
.
numOfCols
;
++
i
)
{
...
...
@@ -3315,7 +3315,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) {
GET_RES_INFO
(
pCtx
)
->
numOfRes
+=
pCtx
->
size
;
SArithmeticSupport
*
sas
=
(
SArithmeticSupport
*
)
pCtx
->
param
[
1
].
pz
;
tSQLBinaryExprCalcTraverse
(
sas
->
pExpr
->
binExprInfo
.
pBinExpr
,
pCtx
->
size
,
pCtx
->
aOutputBuf
,
sas
,
pCtx
->
order
,
tSQLBinaryExprCalcTraverse
(
sas
->
p
Arith
Expr
->
binExprInfo
.
pBinExpr
,
pCtx
->
size
,
pCtx
->
aOutputBuf
,
sas
,
pCtx
->
order
,
arithmetic_callback_function
);
pCtx
->
aOutputBuf
+=
pCtx
->
outputBytes
*
pCtx
->
size
;
...
...
@@ -3327,10 +3327,10 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SArithmeticSupport
*
sas
=
(
SArithmeticSupport
*
)
pCtx
->
param
[
1
].
pz
;
sas
->
offset
=
index
;
tSQLBinaryExprCalcTraverse
(
sas
->
pExpr
->
binExprInfo
.
pBinExpr
,
1
,
pCtx
->
aOutputBuf
,
sas
,
pCtx
->
order
,
tSQLBinaryExprCalcTraverse
(
sas
->
p
Arith
Expr
->
binExprInfo
.
pBinExpr
,
1
,
pCtx
->
aOutputBuf
,
sas
,
pCtx
->
order
,
arithmetic_callback_function
);
pCtx
->
aOutputBuf
+=
pCtx
->
outputBytes
/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/
;
pCtx
->
aOutputBuf
+=
pCtx
->
outputBytes
;
}
#define LIST_MINMAX_N(ctx, minOutput, maxOutput, elemCnt, data, type, tsdbType, numOfNotNullElem) \
...
...
src/client/src/tscLocal.c
浏览文件 @
47c2796b
...
...
@@ -130,13 +130,13 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSchema
*
pSchema
=
tscGetTableSchema
(
pMeta
);
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
0
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
0
);
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
pQueryInfo
,
0
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
pSchema
[
i
].
name
,
TSDB_COL_NAME_LEN
);
char
*
type
=
tDataTypeDesc
[
pSchema
[
i
].
type
].
aName
;
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
1
);
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
1
);
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
pQueryInfo
,
1
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
type
,
pField
->
bytes
);
int32_t
bytes
=
pSchema
[
i
].
bytes
;
...
...
@@ -144,10 +144,10 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
bytes
=
bytes
/
TSDB_NCHAR_SIZE
;
}
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
2
);
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
2
);
*
(
int32_t
*
)(
pRes
->
data
+
tscFieldInfoGetOffset
(
pQueryInfo
,
2
)
*
totalNumOfRows
+
pField
->
bytes
*
i
)
=
bytes
;
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
3
);
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
3
);
if
(
i
>=
tscGetNumOfColumns
(
pMeta
)
&&
tscGetNumOfTags
(
pMeta
)
!=
0
)
{
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
pQueryInfo
,
3
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
"tag"
,
strlen
(
"tag"
)
+
1
);
...
...
@@ -162,18 +162,18 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
char
*
pTagValue
=
tsGetTagsValue
(
pMeta
);
for
(
int32_t
i
=
numOfRows
;
i
<
totalNumOfRows
;
++
i
)
{
// field name
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
0
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
0
);
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
pQueryInfo
,
0
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
pSchema
[
i
].
name
,
TSDB_COL_NAME_LEN
);
// type name
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
1
);
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
1
);
char
*
type
=
tDataTypeDesc
[
pSchema
[
i
].
type
].
aName
;
strncpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
pQueryInfo
,
1
)
*
totalNumOfRows
+
pField
->
bytes
*
i
,
type
,
pField
->
bytes
);
// type length
int32_t
bytes
=
pSchema
[
i
].
bytes
;
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
2
);
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
2
);
if
(
pSchema
[
i
].
type
==
TSDB_DATA_TYPE_NCHAR
)
{
bytes
=
bytes
/
TSDB_NCHAR_SIZE
;
}
...
...
@@ -181,7 +181,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
*
(
int32_t
*
)(
pRes
->
data
+
tscFieldInfoGetOffset
(
pQueryInfo
,
2
)
*
totalNumOfRows
+
pField
->
bytes
*
i
)
=
bytes
;
// tag value
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
3
);
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
3
);
char
*
target
=
pRes
->
data
+
tscFieldInfoGetOffset
(
pQueryInfo
,
3
)
*
totalNumOfRows
+
pField
->
bytes
*
i
;
if
(
isNull
(
pTagValue
,
pSchema
[
i
].
type
))
{
...
...
@@ -236,31 +236,47 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
static
int32_t
tscBuildMeterSchemaResultFields
(
SSqlObj
*
pSql
,
int32_t
numOfCols
,
int32_t
typeColLength
,
int32_t
noteColLength
)
{
int32_t
rowLen
=
0
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pCmd
->
numOfCols
=
numOfCols
;
SColumnIndex
index
=
{
0
};
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
pSql
->
cmd
.
numOfCols
=
numOfCols
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
pQueryInfo
->
order
.
order
=
TSDB_ORDER_ASC
;
tscFieldInfoSetValue
(
&
pQueryInfo
->
fieldsInfo
,
0
,
TSDB_DATA_TYPE_BINARY
,
"Field"
,
TSDB_COL_NAME_LEN
);
TAOS_FIELD
f
=
{.
type
=
TSDB_DATA_TYPE_BINARY
,
.
bytes
=
TSDB_COL_NAME_LEN
};
strncpy
(
f
.
name
,
"Field"
,
TSDB_COL_NAME_LEN
);
SFieldSupInfo
*
pInfo
=
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
pInfo
->
pSqlExpr
=
tscSqlExprInsert
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
TSDB_COL_NAME_LEN
,
TSDB_COL_NAME_LEN
);
rowLen
+=
TSDB_COL_NAME_LEN
;
tscFieldInfoSetValue
(
&
pQueryInfo
->
fieldsInfo
,
1
,
TSDB_DATA_TYPE_BINARY
,
"Type"
,
typeColLength
);
f
.
bytes
=
typeColLength
;
f
.
type
=
TSDB_DATA_TYPE_BINARY
;
strncpy
(
f
.
name
,
"Type"
,
TSDB_COL_NAME_LEN
);
pInfo
=
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
pInfo
->
pSqlExpr
=
tscSqlExprInsert
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
typeColLength
,
typeColLength
);
rowLen
+=
typeColLength
;
tscFieldInfoSetValue
(
&
pQueryInfo
->
fieldsInfo
,
2
,
TSDB_DATA_TYPE_INT
,
"Length"
,
sizeof
(
int32_t
));
f
.
bytes
=
sizeof
(
int32_t
);
f
.
type
=
TSDB_DATA_TYPE_INT
;
strncpy
(
f
.
name
,
"Length"
,
TSDB_COL_NAME_LEN
);
pInfo
=
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
pInfo
->
pSqlExpr
=
tscSqlExprInsert
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_INT
,
sizeof
(
int32_t
),
sizeof
(
int32_t
));
rowLen
+=
sizeof
(
int32_t
);
tscFieldInfoSetValue
(
&
pQueryInfo
->
fieldsInfo
,
3
,
TSDB_DATA_TYPE_BINARY
,
"Note"
,
noteColLength
);
rowLen
+=
noteColLength
;
f
.
bytes
=
noteColLength
;
f
.
type
=
TSDB_DATA_TYPE_BINARY
;
strncpy
(
f
.
name
,
"Note"
,
TSDB_COL_NAME_LEN
);
//set the sqlexpr part
SColumnIndex
index
=
{
0
};
pQueryInfo
->
fieldsInfo
.
pSqlExpr
[
0
]
=
tscSqlExprInsert
(
pQueryInfo
,
0
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
TSDB_COL_NAME_LEN
,
TSDB_COL_NAME_LEN
);
pQueryInfo
->
fieldsInfo
.
pSqlExpr
[
1
]
=
tscSqlExprInsert
(
pQueryInfo
,
1
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
typeColLength
,
typeColLength
);
pQueryInfo
->
fieldsInfo
.
pSqlExpr
[
2
]
=
tscSqlExprInsert
(
pQueryInfo
,
2
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_INT
,
sizeof
(
int32_t
),
sizeof
(
int32_t
));
pQueryInfo
->
fieldsInfo
.
pSqlExpr
[
3
]
=
tscSqlExprInsert
(
pQueryInfo
,
3
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
noteColLength
,
noteColLength
);
pInfo
=
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
pInfo
->
pSqlExpr
=
tscSqlExprInsert
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
noteColLength
,
noteColLength
);
rowLen
+=
noteColLength
;
return
rowLen
;
}
...
...
@@ -310,7 +326,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
}
int32_t totalNumOfResults = pMetricMeta->numOfTables;
int32_t rowLen = tscGetResRowLength(pQueryInfo);
int32_t rowLen = tscGetResRowLength(pQueryInfo
->exprsInfo
);
tscInitResObjForLocalQuery(pSql, totalNumOfResults, rowLen);
...
...
@@ -321,7 +337,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
for (int32_t j = 0; j < pSidList->numOfSids; ++j) {
STableIdInfo *pSidExt = tscGetMeterSidInfo(pSidList, j);
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutput
Cols
; ++k) {
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutput; ++k) {
SColIndex *pColIndex = &tscSqlExprGet(pQueryInfo, k)->colInfo;
int16_t offsetId = pColIndex->colIdx;
...
...
@@ -329,7 +345,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
assert(0);
char * val = NULL;//pSidExt->tags + vOffset[offsetId];
TAOS_FIELD *pField = tscFieldInfoGetField(
pQuery
Info, k);
TAOS_FIELD *pField = tscFieldInfoGetField(
&pQueryInfo->fields
Info, k);
memcpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, k) * totalNumOfResults + pField->bytes * rowIdx, val,
(size_t)pField->bytes);
...
...
@@ -350,17 +366,17 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) {
#if 0
SSuperTableMeta *pMetricMeta = tscGetMetaInfo(pQueryInfo, 0)->pMetricMeta;
int32_t totalNumOfResults = 1; // count function only produce one result
int32_t rowLen = tscGetResRowLength(pQueryInfo);
int32_t rowLen = tscGetResRowLength(pQueryInfo
->exprsInfo
);
tscInitResObjForLocalQuery(pSql, totalNumOfResults, rowLen);
int32_t rowIdx = 0;
for (int32_t i = 0; i < totalNumOfResults; ++i) {
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutput
Cols
; ++k) {
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutput; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->colInfo.colIdx == -1 && pExpr->functionId == TSDB_FUNC_COUNT) {
TAOS_FIELD *pField = tscFieldInfoGetField(
pQuery
Info, k);
TAOS_FIELD *pField = tscFieldInfoGetField(
&pQueryInfo->fields
Info, k);
memcpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, i) * totalNumOfResults + pField->bytes * rowIdx,
&pMetricMeta->numOfTables, sizeof(pMetricMeta->numOfTables));
...
...
@@ -388,7 +404,7 @@ static int tscProcessQueryTags(SSqlObj *pSql) {
return
pSql
->
res
.
code
;
}
SSqlExpr
*
pExpr
=
t
scSqlExprGet
(
pQuery
Info
,
0
);
SSqlExpr
*
pExpr
=
t
aosArrayGetP
(
pQueryInfo
->
exprs
Info
,
0
);
if
(
pExpr
->
functionId
==
TSDB_FUNC_COUNT
)
{
return
tscBuildMetricTagSqlFunctionResult
(
pSql
);
}
else
{
...
...
@@ -399,7 +415,7 @@ static int tscProcessQueryTags(SSqlObj *pSql) {
static
void
tscProcessCurrentUser
(
SSqlObj
*
pSql
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
t
scSqlExprGet
(
pQuery
Info
,
0
);
SSqlExpr
*
pExpr
=
t
aosArrayGetP
(
pQueryInfo
->
exprs
Info
,
0
);
tscSetLocalQueryResult
(
pSql
,
pSql
->
pTscObj
->
user
,
pExpr
->
aliasName
,
TSDB_USER_LEN
);
}
...
...
@@ -414,7 +430,7 @@ static void tscProcessCurrentDB(SSqlObj *pSql) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
t
scSqlExprGet
(
pQuery
Info
,
0
);
SSqlExpr
*
pExpr
=
t
aosArrayGetP
(
pQueryInfo
->
exprs
Info
,
0
);
tscSetLocalQueryResult
(
pSql
,
db
,
pExpr
->
aliasName
,
TSDB_DB_NAME_LEN
);
}
...
...
@@ -422,14 +438,14 @@ static void tscProcessServerVer(SSqlObj *pSql) {
const
char
*
v
=
pSql
->
pTscObj
->
sversion
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
t
scSqlExprGet
(
pQuery
Info
,
0
);
SSqlExpr
*
pExpr
=
t
aosArrayGetP
(
pQueryInfo
->
exprs
Info
,
0
);
tscSetLocalQueryResult
(
pSql
,
v
,
pExpr
->
aliasName
,
tListLen
(
pSql
->
pTscObj
->
sversion
));
}
static
void
tscProcessClientVer
(
SSqlObj
*
pSql
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
t
scSqlExprGet
(
pQuery
Info
,
0
);
SSqlExpr
*
pExpr
=
t
aosArrayGetP
(
pQueryInfo
->
exprs
Info
,
0
);
tscSetLocalQueryResult
(
pSql
,
version
,
pExpr
->
aliasName
,
strlen
(
version
));
}
...
...
@@ -449,7 +465,7 @@ static void tscProcessServStatus(SSqlObj *pSql) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
t
scSqlExprGet
(
pQuery
Info
,
0
);
SSqlExpr
*
pExpr
=
t
aosArrayGetP
(
pQueryInfo
->
exprs
Info
,
0
);
tscSetLocalQueryResult
(
pSql
,
"1"
,
pExpr
->
aliasName
,
2
);
}
...
...
@@ -462,13 +478,16 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
pQueryInfo
->
order
.
order
=
TSDB_ORDER_ASC
;
tscClearFieldInfo
(
&
pQueryInfo
->
fieldsInfo
);
tscFieldInfoClear
(
&
pQueryInfo
->
fieldsInfo
);
TAOS_FIELD
f
=
tscCreateField
(
TSDB_DATA_TYPE_BINARY
,
columnName
,
valueLength
);
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
tscFieldInfoSetValue
(
&
pQueryInfo
->
fieldsInfo
,
0
,
TSDB_DATA_TYPE_BINARY
,
columnName
,
valueLength
);
tscInitResObjForLocalQuery
(
pSql
,
1
,
valueLength
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
pQueryInfo
,
0
);
pQueryInfo
->
fieldsInfo
.
pSqlExpr
[
0
]
=
pQueryInfo
->
exprsInfo
.
pExprs
[
0
];
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fieldsInfo
,
0
);
SFieldSupInfo
*
pInfo
=
tscFieldInfoGetSupp
(
&
pQueryInfo
->
fieldsInfo
,
0
);
pInfo
->
pSqlExpr
=
taosArrayGetP
(
pQueryInfo
->
exprsInfo
,
0
);
strncpy
(
pRes
->
data
,
val
,
pField
->
bytes
);
}
...
...
src/client/src/tscParseInsert.c
浏览文件 @
47c2796b
...
...
@@ -779,7 +779,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
}
STableMetaInfo
*
pSTableMeterMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
STABLE_INDEX
);
setMeterID
(
pSTableMeterMetaInfo
,
&
sToken
,
pSql
);
tscSetTableId
(
pSTableMeterMetaInfo
,
&
sToken
,
pSql
);
strncpy
(
pTag
->
name
,
pSTableMeterMetaInfo
->
name
,
TSDB_TABLE_ID_LEN
);
code
=
tscGetTableMeta
(
pSql
,
pSTableMeterMetaInfo
);
...
...
@@ -922,7 +922,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"invalid table name"
,
*
sqlstr
);
}
int32_t
ret
=
setMeterID
(
pTableMetaInfo
,
&
tableToken
,
pSql
);
int32_t
ret
=
tscSetTableId
(
pTableMetaInfo
,
&
tableToken
,
pSql
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
...
...
@@ -1059,7 +1059,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto
_error_clean
;
}
if
((
code
=
setMeterID
(
pTableMetaInfo
,
&
sToken
,
pSql
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
code
=
tscSetTableId
(
pTableMetaInfo
,
&
sToken
,
pSql
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_error_clean
;
}
...
...
src/client/src/tscSQLParser.c
浏览文件 @
47c2796b
此差异已折叠。
点击以展开。
src/client/src/tscSecondaryMerge.c
浏览文件 @
47c2796b
...
...
@@ -61,7 +61,9 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
* merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object.
*/
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
exprsInfo
.
numOfExprs
;
++
i
)
{
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SQLFunctionCtx
*
pCtx
=
&
pReducer
->
pCtx
[
i
];
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
...
...
@@ -108,10 +110,10 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
int16_t
n
=
0
;
int16_t
tagLen
=
0
;
SQLFunctionCtx
**
pTagCtx
=
calloc
(
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
,
POINTER_BYTES
);
SQLFunctionCtx
**
pTagCtx
=
calloc
(
pQueryInfo
->
fieldsInfo
.
numOfOutput
,
POINTER_BYTES
);
SQLFunctionCtx
*
pCtx
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
if
(
pExpr
->
functionId
==
TSDB_FUNC_TAG_DUMMY
||
pExpr
->
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
tagLen
+=
pExpr
->
resBytes
;
...
...
@@ -254,8 +256,9 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
// the input data format follows the old format, but output in a new format.
// so, all the input must be parsed as old format
pReducer
->
pCtx
=
(
SQLFunctionCtx
*
)
calloc
(
pQueryInfo
->
exprsInfo
.
numOfExprs
,
sizeof
(
SQLFunctionCtx
)
);
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
pReducer
->
pCtx
=
(
SQLFunctionCtx
*
)
calloc
(
size
,
sizeof
(
SQLFunctionCtx
));
pReducer
->
rowSize
=
pMemBuffer
[
0
]
->
nElemSize
;
tscRestoreSQLFunctionForMetricQuery
(
pQueryInfo
);
...
...
@@ -279,7 +282,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer
->
nResultBufSize
=
pMemBuffer
[
0
]
->
pageSize
*
16
;
pReducer
->
pResultBuf
=
(
tFilePage
*
)
calloc
(
1
,
pReducer
->
nResultBufSize
+
sizeof
(
tFilePage
));
int32_t
finalRowLength
=
tscGetResRowLength
(
pQueryInfo
);
int32_t
finalRowLength
=
tscGetResRowLength
(
pQueryInfo
->
exprsInfo
);
pReducer
->
resColModel
=
finalmodel
;
pReducer
->
resColModel
->
capacity
=
pReducer
->
nResultBufSize
/
finalRowLength
;
assert
(
finalRowLength
<=
pReducer
->
rowSize
);
...
...
@@ -301,7 +304,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
}
pReducer
->
pTempBuffer
->
numOfElems
=
0
;
pReducer
->
pResInfo
=
calloc
(
(
size_t
)
pQueryInfo
->
exprsInfo
.
numOfExprs
,
sizeof
(
SResultInfo
));
pReducer
->
pResInfo
=
calloc
(
size
,
sizeof
(
SResultInfo
));
tscCreateResPointerInfo
(
pRes
,
pQueryInfo
);
tscInitSqlContext
(
pCmd
,
pRes
,
pReducer
,
pDesc
);
...
...
@@ -332,7 +335,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
taosInitInterpoInfo
(
pInterpoInfo
,
pQueryInfo
->
order
.
order
,
revisedSTime
,
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
,
pReducer
->
rowSize
);
int32_t
startIndex
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
-
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
int32_t
startIndex
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
-
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
if
(
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
>
0
)
{
pInterpoInfo
->
pTags
[
0
]
=
(
char
*
)
pInterpoInfo
->
pTags
+
POINTER_BYTES
*
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
...
...
@@ -462,7 +465,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
taosDestoryInterpoInfo
(
&
pLocalReducer
->
interpolationInfo
);
if
(
pLocalReducer
->
pCtx
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
SQLFunctionCtx
*
pCtx
=
&
pLocalReducer
->
pCtx
[
i
];
tVariantDestroy
(
&
pCtx
->
tag
);
...
...
@@ -480,7 +483,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
tfree
(
pLocalReducer
->
pResultBuf
);
if
(
pLocalReducer
->
pResInfo
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
tfree
(
pLocalReducer
->
pResInfo
[
i
].
interResultBuf
);
}
...
...
@@ -532,7 +535,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
}
if
(
numOfGroupByCols
>
0
)
{
int32_t
startCols
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
-
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
int32_t
startCols
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
-
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
// tags value locate at the last columns
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
++
i
)
{
...
...
@@ -613,7 +616,9 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
return
pRes
->
code
;
}
pSchema
=
(
SSchema
*
)
calloc
(
1
,
sizeof
(
SSchema
)
*
pQueryInfo
->
exprsInfo
.
numOfExprs
);
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
pSchema
=
(
SSchema
*
)
calloc
(
1
,
sizeof
(
SSchema
)
*
size
);
if
(
pSchema
==
NULL
)
{
tscError
(
"%p failed to allocate memory"
,
pSql
);
pRes
->
code
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
...
...
@@ -621,7 +626,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
}
int32_t
rlen
=
0
;
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
exprsInfo
.
numOfExprs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
pSchema
[
i
].
bytes
=
pExpr
->
resBytes
;
...
...
@@ -635,7 +640,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
capacity
=
nBufferSizes
/
rlen
;
}
pModel
=
createColumnModel
(
pSchema
,
pQueryInfo
->
exprsInfo
.
numOfExprs
,
capacity
);
pModel
=
createColumnModel
(
pSchema
,
size
,
capacity
);
size_t
numOfSubs
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
for
(
int32_t
i
=
0
;
i
<
numOfSubs
;
++
i
)
{
...
...
@@ -649,8 +654,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
}
// final result depends on the fields number
memset
(
pSchema
,
0
,
sizeof
(
SSchema
)
*
pQueryInfo
->
exprsInfo
.
numOfExprs
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
exprsInfo
.
numOfExprs
;
++
i
)
{
memset
(
pSchema
,
0
,
sizeof
(
SSchema
)
*
size
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
SSchema
*
p1
=
tscGetTableColumnSchema
(
pTableMetaInfo
->
pTableMeta
,
pExpr
->
colInfo
.
colIndex
);
...
...
@@ -684,7 +689,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
strcpy
(
pSchema
[
i
].
name
,
pModel
->
pFields
[
i
].
field
.
name
);
}
*
pFinalModel
=
createColumnModel
(
pSchema
,
pQueryInfo
->
exprsInfo
.
numOfExprs
,
capacity
);
*
pFinalModel
=
createColumnModel
(
pSchema
,
size
,
capacity
);
tfree
(
pSchema
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -800,7 +805,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
// static void reversedCopyResultToDstBuf(SQueryInfo* pQueryInfo, SSqlRes *pRes, tFilePage *pFinalDataPage) {
//
// for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
// TAOS_FIELD *pField = tscFieldInfoGetField(
pQuery
Info, i);
// TAOS_FIELD *pField = tscFieldInfoGetField(
&pQueryInfo->fields
Info, i);
//
// int32_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
// char * src = pFinalDataPage->data + (pRes->numOfRows - 1) * pField->bytes + pRes->numOfRows * offset;
...
...
@@ -817,8 +822,10 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
static
void
reversedCopyFromInterpolationToDstBuf
(
SQueryInfo
*
pQueryInfo
,
SSqlRes
*
pRes
,
tFilePage
**
pResPages
,
SLocalReducer
*
pLocalReducer
)
{
assert
(
0
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
exprsInfo
.
numOfExprs
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
pQueryInfo
,
i
);
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fieldsInfo
,
i
);
int32_t
offset
=
tscFieldInfoGetOffset
(
pQueryInfo
,
i
);
assert
(
offset
==
getColumnModelOffset
(
pLocalReducer
->
resColModel
,
i
));
...
...
@@ -894,7 +901,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
savePrevRecordAndSetupInterpoInfo
(
pLocalReducer
,
pQueryInfo
,
&
pLocalReducer
->
interpolationInfo
);
}
int32_t
rowSize
=
tscGetResRowLength
(
pQueryInfo
);
int32_t
rowSize
=
tscGetResRowLength
(
pQueryInfo
->
exprsInfo
);
memcpy
(
pRes
->
data
,
pFinalDataPage
->
data
,
pRes
->
numOfRows
*
rowSize
);
pFinalDataPage
->
numOfElems
=
0
;
...
...
@@ -907,16 +914,16 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
int64_t
actualETime
=
(
pQueryInfo
->
stime
<
pQueryInfo
->
etime
)
?
pQueryInfo
->
etime
:
pQueryInfo
->
stime
;
tFilePage
**
pResPages
=
malloc
(
POINTER_BYTES
*
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
i
);
tFilePage
**
pResPages
=
malloc
(
POINTER_BYTES
*
pQueryInfo
->
fieldsInfo
.
numOfOutput
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
i
);
pResPages
[
i
]
=
calloc
(
1
,
sizeof
(
tFilePage
)
+
pField
->
bytes
*
pLocalReducer
->
resColModel
->
capacity
);
}
char
**
srcData
=
(
char
**
)
malloc
((
POINTER_BYTES
+
sizeof
(
int32_t
))
*
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
);
int32_t
*
functions
=
(
int32_t
*
)((
char
*
)
srcData
+
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
*
sizeof
(
void
*
));
char
**
srcData
=
(
char
**
)
malloc
((
POINTER_BYTES
+
sizeof
(
int32_t
))
*
pQueryInfo
->
fieldsInfo
.
numOfOutput
);
int32_t
*
functions
=
(
int32_t
*
)((
char
*
)
srcData
+
pQueryInfo
->
fieldsInfo
.
numOfOutput
*
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
srcData
[
i
]
=
pLocalReducer
->
pBufForInterpo
+
tscFieldInfoGetOffset
(
pQueryInfo
,
i
)
*
pInterpoInfo
->
numOfRawDataInRows
;
functions
[
i
]
=
tscSqlExprGet
(
pQueryInfo
,
i
)
->
functionId
;
...
...
@@ -943,8 +950,8 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
newRows
-=
pQueryInfo
->
limit
.
offset
;
if
(
pQueryInfo
->
limit
.
offset
>
0
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
i
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
i
);
memmove
(
pResPages
[
i
]
->
data
,
pResPages
[
i
]
->
data
+
pField
->
bytes
*
pQueryInfo
->
limit
.
offset
,
newRows
*
pField
->
bytes
);
}
...
...
@@ -992,8 +999,8 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
}
if
(
pQueryInfo
->
order
.
order
==
TSDB_ORDER_ASC
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
i
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
i
);
int16_t
offset
=
getColumnModelOffset
(
pLocalReducer
->
resColModel
,
i
);
memcpy
(
pRes
->
data
+
offset
*
pRes
->
numOfRows
,
pResPages
[
i
]
->
data
,
pField
->
bytes
*
pRes
->
numOfRows
);
}
...
...
@@ -1003,7 +1010,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
}
pFinalDataPage
->
numOfElems
=
0
;
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
tfree
(
pResPages
[
i
]);
}
tfree
(
pResPages
);
...
...
@@ -1030,8 +1037,9 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer)
static
void
doExecuteSecondaryMerge
(
SSqlCmd
*
pCmd
,
SLocalReducer
*
pLocalReducer
,
bool
needInit
)
{
// the tag columns need to be set before all functions execution
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
j
=
0
;
j
<
pQueryInfo
->
exprsInfo
.
numOfExprs
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
size
;
++
j
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
j
);
SQLFunctionCtx
*
pCtx
=
&
pLocalReducer
->
pCtx
[
j
];
...
...
@@ -1051,7 +1059,7 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer,
}
}
for
(
int32_t
j
=
0
;
j
<
pQueryInfo
->
exprsInfo
.
numOfExprs
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
size
;
++
j
)
{
int32_t
functionId
=
tscSqlExprGet
(
pQueryInfo
,
j
)
->
functionId
;
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
...
...
@@ -1072,7 +1080,8 @@ static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, tF
static
int64_t
getNumOfResultLocal
(
SQueryInfo
*
pQueryInfo
,
SQLFunctionCtx
*
pCtx
)
{
int64_t
maxOutput
=
0
;
for
(
int32_t
j
=
0
;
j
<
pQueryInfo
->
exprsInfo
.
numOfExprs
;
++
j
)
{
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
j
=
0
;
j
<
size
;
++
j
)
{
// SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[j];
// if (pExpr == NULL) {
// assert(pQueryInfo->fieldsInfo.pExpr[j] != NULL);
...
...
@@ -1107,7 +1116,9 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
*/
static
void
fillMultiRowsOfTagsVal
(
SQueryInfo
*
pQueryInfo
,
int32_t
numOfRes
,
SLocalReducer
*
pLocalReducer
)
{
int32_t
maxBufSize
=
0
;
// find the max tags column length to prepare the buffer
for
(
int32_t
k
=
0
;
k
<
pQueryInfo
->
exprsInfo
.
numOfExprs
;
++
k
)
{
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
k
=
0
;
k
<
size
;
++
k
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
k
);
if
(
maxBufSize
<
pExpr
->
resBytes
&&
pExpr
->
functionId
==
TSDB_FUNC_TAG
)
{
maxBufSize
=
pExpr
->
resBytes
;
...
...
@@ -1117,7 +1128,7 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo
assert
(
maxBufSize
>=
0
);
char
*
buf
=
malloc
((
size_t
)
maxBufSize
);
for
(
int32_t
k
=
0
;
k
<
pQueryInfo
->
exprsInfo
.
numOfExprs
;
++
k
)
{
for
(
int32_t
k
=
0
;
k
<
size
;
++
k
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
k
);
if
(
pExpr
->
functionId
!=
TSDB_FUNC_TAG
)
{
continue
;
...
...
@@ -1139,7 +1150,9 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo
}
int32_t
finalizeRes
(
SQueryInfo
*
pQueryInfo
,
SLocalReducer
*
pLocalReducer
)
{
for
(
int32_t
k
=
0
;
k
<
pQueryInfo
->
exprsInfo
.
numOfExprs
;
++
k
)
{
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
k
=
0
;
k
<
size
;
++
k
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
k
);
aAggs
[
pExpr
->
functionId
].
xFinalize
(
&
pLocalReducer
->
pCtx
[
k
]);
}
...
...
@@ -1242,7 +1255,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
#endif
SInterpolationInfo
*
pInterpoInfo
=
&
pLocalReducer
->
interpolationInfo
;
int32_t
startIndex
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
-
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
int32_t
startIndex
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
-
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
++
i
)
{
int16_t
offset
=
getColumnModelOffset
(
pModel
,
startIndex
+
i
);
...
...
@@ -1258,7 +1271,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
}
void
resetOutputBuf
(
SQueryInfo
*
pQueryInfo
,
SLocalReducer
*
pLocalReducer
)
{
// reset output buffer to the beginning
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
pLocalReducer
->
pCtx
[
i
].
aOutputBuf
=
pLocalReducer
->
pResultBuf
->
data
+
tscFieldInfoGetOffset
(
pQueryInfo
,
i
)
*
pLocalReducer
->
resColModel
->
capacity
;
}
...
...
@@ -1386,8 +1399,9 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
SLocalReducer
*
pLocalReducer
=
pRes
->
pLocalReducer
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
k
=
0
;
k
<
pQueryInfo
->
exprsInfo
.
numOfExprs
;
++
k
)
{
for
(
int32_t
k
=
0
;
k
<
size
;
++
k
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
k
);
SQLFunctionCtx
*
pCtx
=
&
pLocalReducer
->
pCtx
[
k
];
...
...
src/client/src/tscServer.c
浏览文件 @
47c2796b
...
...
@@ -359,13 +359,13 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
/*
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
* may be freed in UDF, and reused by other threads before tscShould
FreeAsyncSqlObj
called, in which case
* tscShould
FreeAsyncSqlObj
checks an object which is actually allocated by other threads.
* may be freed in UDF, and reused by other threads before tscShould
BeFreed
called, in which case
* tscShould
BeFreed
checks an object which is actually allocated by other threads.
*
* If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
* the tscShould
FreeAsyncSqlObj
will success and tscFreeSqlObj free it immediately.
* the tscShould
BeFreed
will success and tscFreeSqlObj free it immediately.
*/
bool
shouldFree
=
tscShould
FreeAsyncSqlObj
(
pSql
);
bool
shouldFree
=
tscShould
BeFreed
(
pSql
);
(
*
pSql
->
fp
)(
pSql
->
param
,
taosres
,
rpcMsg
->
code
);
if
(
shouldFree
)
{
...
...
@@ -574,7 +574,9 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
int32_t
srcColListSize
=
taosArrayGetSize
(
pQueryInfo
->
colList
)
*
sizeof
(
SColumnInfo
);
int32_t
exprSize
=
sizeof
(
SSqlFuncExprMsg
)
*
pQueryInfo
->
exprsInfo
.
numOfExprs
;
size_t
numOfExprs
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
int32_t
exprSize
=
sizeof
(
SSqlFuncMsg
)
*
numOfExprs
;
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
// meter query without tags values
...
...
@@ -583,20 +585,6 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
}
int32_t
size
=
4096
;
#if 0
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vgroupIndex);
int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableIdInfo)) * pVnodeSidList->numOfSids;
int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg);
int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE;
if (pQueryInfo->tsBuf != NULL) {
size += pQueryInfo->tsBuf->fileSize;
}
#endif
return
size
;
}
...
...
@@ -674,17 +662,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSql
->
ipList
.
ip
[
i
]
=
pVgroupInfo
->
ipAddr
[
i
].
ip
;
}
#if 0
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vgroupIndex);
uint32_t vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode;
numOfTables = pVnodeSidList->numOfSids;
if (numOfTables <= 0) {
tscError("%p vid:%d,error numOfTables in query message:%d", pSql, vnodeId, numOfTables);
return -1; // error
}
#endif
tscTrace
(
"%p query on super table, numOfVgroup:%d, vgroupIndex:%d"
,
pSql
,
pTableMetaInfo
->
vgroupList
->
numOfVgroups
,
index
);
pQueryMsg
->
head
.
vgId
=
htonl
(
pVgroupInfo
->
vgId
);
...
...
@@ -712,9 +689,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg
->
numOfGroupCols
=
htons
(
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
);
pQueryMsg
->
queryType
=
htons
(
pQueryInfo
->
type
);
pQueryMsg
->
numOfOutputCols
=
htons
(
pQueryInfo
->
exprsInfo
.
numOfExprs
);
int32_t
numOfOutput
=
pQueryInfo
->
fieldsInfo
.
numOfOutputCols
;
size_t
numOfOutput
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
pQueryMsg
->
numOfOutput
=
htons
(
numOfOutput
);
if
(
numOfOutput
<
0
)
{
tscError
(
"%p illegal value of number of output columns in query msg: %d"
,
pSql
,
numOfOutput
);
return
-
1
;
...
...
@@ -773,7 +750,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
bool
hasArithmeticFunction
=
false
;
SSqlFunc
ExprMsg
*
pSqlFuncExpr
=
(
SSqlFuncExpr
Msg
*
)
pMsg
;
SSqlFunc
Msg
*
pSqlFuncExpr
=
(
SSqlFunc
Msg
*
)
pMsg
;
for
(
int32_t
i
=
0
;
i
<
tscSqlExprNumOfExprs
(
pQueryInfo
);
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
...
...
@@ -793,7 +770,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr
->
functionId
=
htons
(
pExpr
->
functionId
);
pSqlFuncExpr
->
numOfParams
=
htons
(
pExpr
->
numOfParams
);
pMsg
+=
sizeof
(
SSqlFunc
Expr
Msg
);
pMsg
+=
sizeof
(
SSqlFuncMsg
);
for
(
int32_t
j
=
0
;
j
<
pExpr
->
numOfParams
;
++
j
)
{
pSqlFuncExpr
->
arg
[
j
].
argType
=
htons
((
uint16_t
)
pExpr
->
param
[
j
].
nType
);
...
...
@@ -809,7 +786,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
pSqlFuncExpr
=
(
SSqlFunc
Expr
Msg
*
)
pMsg
;
pSqlFuncExpr
=
(
SSqlFuncMsg
*
)
pMsg
;
}
int32_t
len
=
0
;
...
...
@@ -855,7 +832,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
if
(
pQueryInfo
->
interpoType
!=
TSDB_INTERPO_NONE
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
*
((
int64_t
*
)
pMsg
)
=
htobe64
(
pQueryInfo
->
defaultVal
[
i
]);
pMsg
+=
sizeof
(
pQueryInfo
->
defaultVal
[
0
]);
}
...
...
@@ -1260,7 +1237,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSchema
=
(
SSchema
*
)
pCreateTableMsg
->
schema
;
for
(
int
i
=
0
;
i
<
pCmd
->
numOfCols
+
pCmd
->
count
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
i
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
i
);
pSchema
->
type
=
pField
->
type
;
strcpy
(
pSchema
->
name
,
pField
->
name
);
...
...
@@ -1279,7 +1256,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
tsc
ClearFieldInfo
(
&
pQueryInfo
->
fieldsInfo
);
tsc
FieldInfoClear
(
&
pQueryInfo
->
fieldsInfo
);
msgLen
=
pMsg
-
(
char
*
)
pCreateTableMsg
;
pCreateTableMsg
->
contLen
=
htonl
(
msgLen
);
...
...
@@ -1327,7 +1304,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSchema
*
pSchema
=
pAlterTableMsg
->
schema
;
for
(
int
i
=
0
;
i
<
tscNumOfFields
(
pQueryInfo
);
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
i
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
i
);
pSchema
->
type
=
pField
->
type
;
strcpy
(
pSchema
->
name
,
pField
->
name
);
...
...
@@ -1386,9 +1363,9 @@ static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
return
pRes
->
code
;
}
for
(
int
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
for
(
int
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
int16_t
offset
=
tscFieldInfoGetOffset
(
pQueryInfo
,
i
);
pRes
->
tsrow
[
i
]
=
(
pRes
->
data
+
offset
*
pRes
->
numOfRows
);
pRes
->
tsrow
[
i
]
=
(
(
char
*
)
pRes
->
data
+
offset
*
pRes
->
numOfRows
);
}
return
0
;
...
...
@@ -2198,21 +2175,30 @@ int tscProcessShowRsp(SSqlObj *pSql) {
pTableMetaInfo
->
pTableMeta
=
(
STableMeta
*
)
taosCachePut
(
tscCacheHandle
,
key
,
(
char
*
)
pTableMeta
,
size
,
tsMeterMetaKeepTimer
);
pCmd
->
numOfCols
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
pCmd
->
numOfCols
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
SSchema
*
pTableSchema
=
tscGetTableSchema
(
pTableMetaInfo
->
pTableMeta
);
if
(
pQueryInfo
->
colList
==
NULL
)
{
pQueryInfo
->
colList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
}
SFieldInfo
*
pFieldInfo
=
&
pQueryInfo
->
fieldsInfo
;
SColumnIndex
index
=
{
0
};
for
(
int16_t
i
=
0
;
i
<
pMetaMsg
->
numOfColumns
;
++
i
)
{
index
.
columnIndex
=
i
;
tscColumnListInsert
(
pQueryInfo
->
colList
,
&
index
);
tscFieldInfoSetValFromSchema
(
&
pQueryInfo
->
fieldsInfo
,
i
,
&
pTableSchema
[
i
]);
TAOS_FIELD
field
=
{
.
bytes
=
pSchema
->
bytes
,
.
type
=
pSchema
->
type
,
};
strncpy
(
field
.
name
,
pSchema
->
name
,
TSDB_COL_NAME_LEN
);
tscFieldInfoAppend
(
pFieldInfo
,
&
field
);
pQueryInfo
->
fieldsInfo
.
pSqlExpr
[
i
]
=
tscSqlExprInsert
(
pQueryInfo
,
i
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
SFieldSupInfo
*
pInfo
=
tscFieldInfoGetSupp
(
pFieldInfo
,
i
);
pInfo
->
pSqlExpr
=
tscSqlExprInsert
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
pTableSchema
[
i
].
type
,
pTableSchema
[
i
].
bytes
,
pTableSchema
[
i
].
bytes
);
}
...
...
@@ -2327,7 +2313,7 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
return
0
;
}
int
tscProcessRetrieveRspFrom
Vn
ode
(
SSqlObj
*
pSql
)
{
int
tscProcessRetrieveRspFrom
N
ode
(
SSqlObj
*
pSql
)
{
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
...
...
@@ -2344,9 +2330,9 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
tscSetResultPointer
(
pQueryInfo
,
pRes
);
if
(
pSql
->
pSubscription
!=
NULL
)
{
int32_t
numOfCols
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
int32_t
numOfCols
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
numOfCols
-
1
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
numOfCols
-
1
);
int16_t
offset
=
tscFieldInfoGetOffset
(
pQueryInfo
,
numOfCols
-
1
);
char
*
p
=
pRes
->
data
+
(
pField
->
bytes
+
offset
)
*
pRes
->
numOfRows
;
...
...
@@ -2629,7 +2615,7 @@ void tscInitMsgsFp() {
tscBuildMsg
[
TSDB_SQL_KILL_CONNECTION
]
=
tscBuildKillMsg
;
tscProcessMsgRsp
[
TSDB_SQL_SELECT
]
=
tscProcessQueryRsp
;
tscProcessMsgRsp
[
TSDB_SQL_FETCH
]
=
tscProcessRetrieveRspFrom
Vn
ode
;
tscProcessMsgRsp
[
TSDB_SQL_FETCH
]
=
tscProcessRetrieveRspFrom
N
ode
;
tscProcessMsgRsp
[
TSDB_SQL_DROP_DB
]
=
tscProcessDropDbRsp
;
tscProcessMsgRsp
[
TSDB_SQL_DROP_TABLE
]
=
tscProcessDropTableRsp
;
...
...
@@ -2640,7 +2626,7 @@ void tscInitMsgsFp() {
tscProcessMsgRsp
[
TSDB_SQL_MULTI_META
]
=
tscProcessMultiMeterMetaRsp
;
tscProcessMsgRsp
[
TSDB_SQL_SHOW
]
=
tscProcessShowRsp
;
tscProcessMsgRsp
[
TSDB_SQL_RETRIEVE
]
=
tscProcessRetrieveRspFrom
Vn
ode
;
// rsp handled by same function.
tscProcessMsgRsp
[
TSDB_SQL_RETRIEVE
]
=
tscProcessRetrieveRspFrom
N
ode
;
// rsp handled by same function.
tscProcessMsgRsp
[
TSDB_SQL_DESCRIBE_TABLE
]
=
tscProcessDescribeTableRsp
;
tscProcessMsgRsp
[
TSDB_SQL_RETRIEVE_TAGS
]
=
tscProcessTagRetrieveRsp
;
...
...
src/client/src/tscSql.c
浏览文件 @
47c2796b
...
...
@@ -326,16 +326,21 @@ int taos_num_fields(TAOS_RES *res) {
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
return
0
;
int32_t
num
=
0
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
(
pQueryInfo
==
NULL
)
{
return
0
;
return
num
;
}
SFieldInfo
*
pFieldsInfo
=
&
pQueryInfo
->
fieldsInfo
;
if
(
pFieldsInfo
)
return
(
pFieldsInfo
->
numOfOutputCols
-
pFieldsInfo
->
numOfHiddenCols
);
else
return
0
;
size_t
numOfCols
=
tscNumOfFields
(
pQueryInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SFieldSupInfo
*
pInfo
=
taosArrayGet
(
pQueryInfo
->
fieldsInfo
.
pSupportInfo
,
i
);
if
(
pInfo
->
visible
)
{
num
++
;
}
}
return
num
;
}
int
taos_field_count
(
TAOS
*
taos
)
{
...
...
@@ -357,11 +362,16 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
return
0
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
(
pQueryInfo
==
NULL
)
{
return
NULL
;
}
if
(
pQueryInfo
)
return
pQueryInfo
->
fieldsInfo
.
pFields
;
else
size_t
numOfCols
=
tscNumOfFields
(
pQueryInfo
);
if
(
numOfCols
==
0
)
{
return
NULL
;
}
return
pQueryInfo
->
fieldsInfo
.
pFields
->
pData
;
}
int
taos_retrieve
(
TAOS_RES
*
res
)
{
...
...
@@ -414,8 +424,8 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
if
(
pQueryInfo
==
NULL
)
return
0
;
for
(
int
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
pRes
->
tsrow
[
i
]
=
TSC_GET_RESPTR_BASE
(
pRes
,
pQueryInfo
,
i
);
for
(
int
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
pRes
->
tsrow
[
i
]
=
tscGetResultColumnChr
(
pRes
,
pQueryInfo
,
i
);
}
*
rows
=
pRes
->
tsrow
;
...
...
@@ -448,7 +458,7 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF
static
char
*
getArithemicInputSrc
(
void
*
param
,
char
*
name
,
int32_t
colId
)
{
SArithmeticSupport
*
pSupport
=
(
SArithmeticSupport
*
)
param
;
S
SqlFunctionExpr
*
pExpr
=
pSupport
->
p
Expr
;
S
ArithExprInfo
*
pExpr
=
pSupport
->
pArith
Expr
;
int32_t
index
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
pExpr
->
binExprInfo
.
numOfCols
;
++
i
)
{
...
...
@@ -476,7 +486,8 @@ static void **doSetResultRowData(SSqlObj *pSql) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
//todo refactor move away
for
(
int32_t
k
=
0
;
k
<
pQueryInfo
->
exprsInfo
.
numOfExprs
;
++
k
)
{
size_t
numOfExprs
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
k
=
0
;
k
<
numOfExprs
;
++
k
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
k
);
if
(
k
>
0
)
{
...
...
@@ -487,9 +498,9 @@ static void **doSetResultRowData(SSqlObj *pSql) {
int32_t
num
=
0
;
for
(
int
i
=
0
;
i
<
tscNumOfFields
(
pQueryInfo
);
++
i
)
{
if
(
pQueryInfo
->
fieldsInfo
.
pSqlExpr
[
i
]
!=
NULL
)
{
SSqlExpr
*
pExpr
=
pQueryInfo
->
fieldsInfo
.
pSqlExpr
[
i
];
pRes
->
tsrow
[
i
]
=
TSC_GET_RESPTR_BASE
(
pRes
,
pQueryInfo
,
i
)
+
p
Expr
->
resBytes
*
pRes
->
row
;
SFieldSupInfo
*
pInfo
=
tscFieldInfoGetSupp
(
&
pQueryInfo
->
fieldsInfo
,
i
);
if
(
pInfo
->
pSqlExpr
!=
NULL
)
{
pRes
->
tsrow
[
i
]
=
tscGetResultColumnChr
(
pRes
,
pQueryInfo
,
i
)
+
pInfo
->
pSql
Expr
->
resBytes
*
pRes
->
row
;
}
else
{
assert
(
0
);
}
...
...
@@ -499,37 +510,37 @@ static void **doSetResultRowData(SSqlObj *pSql) {
continue
;
}
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
i
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
i
);
transferNcharData
(
pSql
,
i
,
pField
);
// calculate the result from se
r
veral other columns
if
(
p
QueryInfo
->
fieldsInfo
.
pExpr
!=
NULL
&&
pQueryInfo
->
fieldsInfo
.
pExpr
[
i
]
!=
NULL
)
{
// calculate the result from several other columns
if
(
p
Info
->
pArithExprInfo
!=
NULL
)
{
SArithmeticSupport
*
sas
=
(
SArithmeticSupport
*
)
calloc
(
1
,
sizeof
(
SArithmeticSupport
));
sas
->
offset
=
0
;
sas
->
p
Expr
=
pQueryInfo
->
fieldsInfo
.
pExpr
[
i
]
;
sas
->
p
ArithExpr
=
pInfo
->
pArithExprInfo
;
sas
->
numOfCols
=
sas
->
pExpr
->
binExprInfo
.
numOfCols
;
sas
->
numOfCols
=
sas
->
p
Arith
Expr
->
binExprInfo
.
numOfCols
;
if
(
pRes
->
buffer
[
i
]
==
NULL
)
{
pRes
->
buffer
[
i
]
=
malloc
(
tscFieldInfoGetField
(
pQuery
Info
,
i
)
->
bytes
);
pRes
->
buffer
[
i
]
=
malloc
(
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
i
)
->
bytes
);
}
for
(
int32_t
k
=
0
;
k
<
sas
->
numOfCols
;
++
k
)
{
int32_t
columnIndex
=
sas
->
pExpr
->
binExprInfo
.
pReqColumns
[
k
].
colIndex
;
int32_t
columnIndex
=
sas
->
p
Arith
Expr
->
binExprInfo
.
pReqColumns
[
k
].
colIndex
;
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
columnIndex
);
sas
->
elemSize
[
k
]
=
pExpr
->
resBytes
;
sas
->
data
[
k
]
=
(
pRes
->
data
+
pRes
->
numOfRows
*
pExpr
->
offset
)
+
pRes
->
row
*
pExpr
->
resBytes
;
}
tSQLBinaryExprCalcTraverse
(
sas
->
pExpr
->
binExprInfo
.
pBinExpr
,
1
,
pRes
->
buffer
[
i
],
sas
,
TSDB_ORDER_ASC
,
getArithemicInputSrc
);
tSQLBinaryExprCalcTraverse
(
sas
->
p
Arith
Expr
->
binExprInfo
.
pBinExpr
,
1
,
pRes
->
buffer
[
i
],
sas
,
TSDB_ORDER_ASC
,
getArithemicInputSrc
);
pRes
->
tsrow
[
i
]
=
pRes
->
buffer
[
i
];
free
(
sas
);
//todo optimization
}
}
assert
(
num
<=
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
);
assert
(
num
<=
pQueryInfo
->
fieldsInfo
.
numOfOutput
);
pRes
->
row
++
;
// index increase one-step
return
pRes
->
tsrow
;
...
...
@@ -594,8 +605,10 @@ static UNUSED_FUNC void **tscBuildResFromSubqueries(SSqlObj *pSql) {
while
(
1
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
size_t
numOfExprs
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
if
(
pRes
->
tsrow
==
NULL
)
{
pRes
->
tsrow
=
calloc
(
pQueryInfo
->
exprsInfo
.
numOfExprs
,
POINTER_BYTES
);
pRes
->
tsrow
=
calloc
(
numOfExprs
,
POINTER_BYTES
);
}
bool
success
=
false
;
...
...
@@ -619,7 +632,9 @@ static UNUSED_FUNC void **tscBuildResFromSubqueries(SSqlObj *pSql) {
}
if
(
success
)
{
// current row of final output has been built, return to app
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
exprsInfo
.
numOfExprs
;
++
i
)
{
size_t
numOfExprs
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfExprs
;
++
i
)
{
int32_t
tableIndex
=
pRes
->
pColumnIndex
[
i
].
tableIndex
;
int32_t
columnIndex
=
pRes
->
pColumnIndex
[
i
].
columnIndex
;
...
...
@@ -1126,7 +1141,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
return
code
;
}
if
((
code
=
setMeterID
(
pTableMetaInfo
,
&
sToken
,
pSql
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
code
=
tscSetTableId
(
pTableMetaInfo
,
&
sToken
,
pSql
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
src/client/src/tscStream.c
浏览文件 @
47c2796b
...
...
@@ -36,7 +36,7 @@ static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t l
}
static
bool
isProjectStream
(
SQueryInfo
*
pQueryInfo
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
if
(
pExpr
->
functionId
!=
TSDB_FUNC_PRJ
)
{
return
false
;
...
...
@@ -219,9 +219,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
void
*
oldPtr
=
pSql
->
res
.
data
;
pSql
->
res
.
data
=
tmpRes
;
for
(
int32_t
i
=
1
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
for
(
int32_t
i
=
1
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
int16_t
offset
=
tscFieldInfoGetOffset
(
pQueryInfo
,
i
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
pQuery
Info
,
i
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fields
Info
,
i
);
assignVal
(
pSql
->
res
.
data
+
offset
,
(
char
*
)(
&
pQueryInfo
->
defaultVal
[
i
]),
pField
->
bytes
,
pField
->
type
);
row
[
i
]
=
pSql
->
res
.
data
+
offset
;
...
...
@@ -231,7 +231,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
row
[
0
]
=
pRes
->
data
;
// char result[512] = {0};
// taos_print_row(result, row, pQueryInfo->fieldsInfo.pFields, pQueryInfo->fieldsInfo.numOfOutput
Cols
);
// taos_print_row(result, row, pQueryInfo->fieldsInfo.pFields, pQueryInfo->fieldsInfo.numOfOutput);
// tscPrint("%p stream:%p query result: %s", pSql, pStream, result);
tscTrace
(
"%p stream:%p fetch result"
,
pSql
,
pStream
);
...
...
src/client/src/tscSubquery.c
浏览文件 @
47c2796b
...
...
@@ -190,10 +190,10 @@ void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) {
return
;
}
tscSqlExprInfoDestroy
(
&
pSupporter
->
exprsInfo
);
tscSqlExprInfoDestroy
(
pSupporter
->
exprsInfo
);
tscColumnListDestroy
(
pSupporter
->
colList
);
tsc
ClearFieldInfo
(
&
pSupporter
->
fieldsInfo
);
tsc
FieldInfoClear
(
&
pSupporter
->
fieldsInfo
);
if
(
pSupporter
->
f
!=
NULL
)
{
fclose
(
pSupporter
->
f
);
...
...
@@ -238,7 +238,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
pSupporter
=
pSql
->
pSubs
[
i
]
->
param
;
if
(
pSupporter
->
exprsInfo
.
numOfExprs
>
0
)
{
if
(
taosArrayGetSize
(
pSupporter
->
exprsInfo
)
>
0
)
{
++
numOfSub
;
}
}
...
...
@@ -264,7 +264,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pSupporter
=
pPrevSub
->
param
;
if
(
pSupporter
->
exprsInfo
.
numOfExprs
==
0
)
{
if
(
taosArrayGetSize
(
pSupporter
->
exprsInfo
)
==
0
)
{
tscTrace
(
"%p subIndex: %d, not need to launch query, ignore it"
,
pSql
,
i
);
tscDestroyJoinSupporter
(
pSupporter
);
...
...
@@ -279,7 +279,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pSubQueryInfo
->
tsBuf
=
NULL
;
// free result for async object will also free sqlObj
assert
(
pSubQueryInfo
->
exprsInfo
.
numOfExprs
==
1
);
// ts_comp query only requires one resutl columns
assert
(
tscSqlExprNumOfExprs
(
pSubQueryInfo
)
==
1
);
// ts_comp query only requires one resutl columns
taos_free_result
(
pPrevSub
);
SSqlObj
*
pNew
=
createSubqueryObj
(
pSql
,
(
int16_t
)
i
,
tscJoinQueryCallback
,
pSupporter
,
TSDB_SQL_SELECT
,
NULL
);
...
...
@@ -304,17 +304,17 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
tscColumnListAssign
(
pQueryInfo
->
colList
,
pSupporter
->
colList
,
0
);
tscTagCondCopy
(
&
pQueryInfo
->
tagCond
,
&
pSupporter
->
tagCond
);
tscSqlExprCopy
(
&
pQueryInfo
->
exprsInfo
,
&
pSupporter
->
exprsInfo
,
pSupporter
->
uid
,
false
);
pQueryInfo
->
exprsInfo
=
tscSqlExprCopy
(
pSupporter
->
exprsInfo
,
pSupporter
->
uid
,
false
);
tscFieldInfoCopyAll
(
&
pQueryInfo
->
fieldsInfo
,
&
pSupporter
->
fieldsInfo
);
pSupporter
->
exprsInfo
.
numOfExprs
=
0
;
pSupporter
->
fieldsInfo
.
numOfOutputCols
=
0
;
pSupporter
->
fieldsInfo
.
numOfOutput
=
0
;
/*
* if the first column of the secondary query is not ts function, add this function.
* Because this column is required to filter with timestamp after intersecting.
*/
if
(
pSupporter
->
exprsInfo
.
pExprs
[
0
]
->
functionId
!=
TSDB_FUNC_TS
)
{
SSqlExpr
*
pExpr
=
taosArrayGet
(
pSupporter
->
exprsInfo
,
0
);
if
(
pExpr
->
functionId
!=
TSDB_FUNC_TS
)
{
tscAddTimestampColumn
(
pQueryInfo
,
TSDB_FUNC_TS
,
0
);
}
...
...
@@ -347,8 +347,8 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
size_t
numOfCols
=
taosArrayGetSize
(
pNewQueryInfo
->
colList
);
tscTrace
(
"%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s"
,
pSql
,
pNew
,
0
,
pTableMetaInfo
->
vgroupIndex
,
pNewQueryInfo
->
type
,
pNewQueryInfo
->
exprsInfo
.
numOfExprs
,
numOfCols
,
pNewQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
,
pNewQueryInfo
->
pTableMetaInfo
[
0
]
->
name
);
taosArrayGetSize
(
pNewQueryInfo
->
exprsInfo
)
,
numOfCols
,
pNewQueryInfo
->
fieldsInfo
.
numOfOutput
,
pNewQueryInfo
->
pTableMetaInfo
[
0
]
->
name
);
}
//prepare the subqueries object failed, abort
...
...
@@ -691,9 +691,9 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
pRes
->
pColumnIndex
=
calloc
(
1
,
sizeof
(
SColumnIndex
)
*
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
);
pRes
->
pColumnIndex
=
calloc
(
1
,
sizeof
(
SColumnIndex
)
*
pQueryInfo
->
fieldsInfo
.
numOfOutput
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
int32_t
tableIndexOfSub
=
-
1
;
...
...
@@ -710,7 +710,8 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
SSqlCmd
*
pSubCmd
=
&
pSql
->
pSubs
[
tableIndexOfSub
]
->
cmd
;
SQueryInfo
*
pSubQueryInfo
=
tscGetQueryInfoDetail
(
pSubCmd
,
0
);
for
(
int32_t
k
=
0
;
k
<
pSubQueryInfo
->
exprsInfo
.
numOfExprs
;
++
k
)
{
size_t
numOfExprs
=
taosArrayGetSize
(
pSubQueryInfo
->
exprsInfo
);
for
(
int32_t
k
=
0
;
k
<
numOfExprs
;
++
k
)
{
SSqlExpr
*
pSubExpr
=
tscSqlExprGet
(
pSubQueryInfo
,
k
);
if
(
pExpr
->
functionId
==
pSubExpr
->
functionId
&&
pExpr
->
colInfo
.
colId
==
pSubExpr
->
colInfo
.
colId
)
{
pRes
->
pColumnIndex
[
i
]
=
(
SColumnIndex
){.
tableIndex
=
tableIndexOfSub
,
.
columnIndex
=
k
};
...
...
@@ -726,11 +727,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
// int32_t idx = pSql->cmd.vnodeIdx;
// SVnodeSidList *vnodeInfo = NULL;
// if (pTableMetaInfo->pMetricMeta != NULL) {
// vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx - 1);
// }
SJoinSubquerySupporter
*
pSupporter
=
(
SJoinSubquerySupporter
*
)
param
;
// if (atomic_add_fetch_32(pSupporter->numOfComplete, 1) >=
...
...
@@ -862,7 +858,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
tscColumnListAssign
(
pSupporter
->
colList
,
pNewQueryInfo
->
colList
,
0
);
tscSqlExprCopy
(
&
pSupporter
->
exprsInfo
,
&
pNewQueryInfo
->
exprsInfo
,
pSupporter
->
uid
,
false
);
pSupporter
->
exprsInfo
=
tscSqlExprCopy
(
pNewQueryInfo
->
exprsInfo
,
pSupporter
->
uid
,
false
);
tscFieldInfoCopyAll
(
&
pSupporter
->
fieldsInfo
,
&
pNewQueryInfo
->
fieldsInfo
);
tscTagCondCopy
(
&
pSupporter
->
tagCond
,
&
pNewQueryInfo
->
tagCond
);
...
...
@@ -876,8 +872,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
memset
(
&
pNewQueryInfo
->
groupbyExpr
,
0
,
sizeof
(
SSqlGroupbyExpr
));
// this data needs to be transfer to support struct
pNewQueryInfo
->
fieldsInfo
.
numOfOutputCols
=
0
;
pNewQueryInfo
->
exprsInfo
.
numOfExprs
=
0
;
pNewQueryInfo
->
fieldsInfo
.
numOfOutput
=
0
;
// set the ts,tags that involved in join, as the output column of intermediate result
tscClearSubqueryInfo
(
&
pNew
->
cmd
);
...
...
@@ -913,8 +908,8 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
tscTrace
(
"%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s"
,
pSql
,
pNew
,
tableIndex
,
pTableMetaInfo
->
vgroupIndex
,
pNewQueryInfo
->
type
,
pNewQueryInfo
->
exprsInfo
.
numOfExprs
,
numOfCols
,
pNewQueryInfo
->
fieldsInfo
.
numOfOutput
Cols
,
pNewQueryInfo
->
pTableMetaInfo
[
0
]
->
name
);
tscSqlExprNumOfExprs
(
pNewQueryInfo
)
,
numOfCols
,
pNewQueryInfo
->
fieldsInfo
.
numOfOutput
,
pNewQueryInfo
->
pTableMetaInfo
[
0
]
->
name
);
tscPrintSelectClause
(
pNew
,
0
);
}
else
{
...
...
src/client/src/tscUtil.c
浏览文件 @
47c2796b
此差异已折叠。
点击以展开。
src/inc/taosmsg.h
浏览文件 @
47c2796b
...
...
@@ -372,7 +372,7 @@ typedef struct SColIndex {
/* sql function msg, to describe the message to vnode about sql function
* operations in select clause */
typedef
struct
SSqlFunc
Expr
Msg
{
typedef
struct
SSqlFuncMsg
{
int16_t
functionId
;
int16_t
numOfParams
;
...
...
@@ -386,21 +386,21 @@ typedef struct SSqlFuncExprMsg {
char
*
pz
;
}
argValue
;
}
arg
[
3
];
}
SSqlFunc
Expr
Msg
;
}
SSqlFuncMsg
;
typedef
struct
S
SqlBinary
ExprInfo
{
typedef
struct
SExprInfo
{
struct
tExprNode
*
pBinExpr
;
/* for binary expression */
int32_t
numOfCols
;
/* binary expression involves the readed number of columns*/
SColIndex
*
pReqColumns
;
/* source column list */
}
S
SqlBinary
ExprInfo
;
}
SExprInfo
;
typedef
struct
S
SqlFunctionExpr
{
SSqlFunc
ExprMsg
pBase
;
S
SqlBinaryExprInfo
binExprInfo
;
int16_t
resB
ytes
;
int16_t
resT
ype
;
typedef
struct
S
ArithExprInfo
{
SSqlFunc
Msg
pBase
;
S
ExprInfo
binExprInfo
;
int16_t
b
ytes
;
int16_t
t
ype
;
int16_t
interResBytes
;
}
S
SqlFunctionExpr
;
}
S
ArithExprInfo
;
typedef
struct
SColumnFilterInfo
{
int16_t
lowerRelOptr
;
...
...
@@ -469,7 +469,7 @@ typedef struct {
int64_t
limit
;
int64_t
offset
;
uint16_t
queryType
;
// denote another query process
int16_t
numOfOutput
Cols
;
// final output columns numbers
int16_t
numOfOutput
;
// final output columns numbers
int16_t
interpoType
;
// interpolate type
uint64_t
defaultVal
;
// default value array list
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录