Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
8de14a0c
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8de14a0c
编写于
12月 06, 2019
作者:
H
hjxilinx
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[tbase-1282]
上级
372ac9c2
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
268 addition
and
179 deletion
+268
-179
src/client/inc/tscSecondaryMerge.h
src/client/inc/tscSecondaryMerge.h
+1
-1
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+34
-35
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+37
-37
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+18
-13
src/client/src/tscJoinProcess.c
src/client/src/tscJoinProcess.c
+99
-31
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+9
-6
src/client/src/tscPrepare.c
src/client/src/tscPrepare.c
+6
-2
src/client/src/tscServer.c
src/client/src/tscServer.c
+37
-37
src/client/src/tscSql.c
src/client/src/tscSql.c
+12
-4
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+15
-13
未找到文件。
src/client/inc/tscSecondaryMerge.h
浏览文件 @
8de14a0c
...
...
@@ -94,7 +94,7 @@ typedef struct SRetrieveSupport {
tOrderDescriptor
*
pOrderDescriptor
;
tColModel
*
pFinalColModel
;
// colModel for final result
SSubqueryState
*
pState
;
int32_t
vnodeId
x
;
// index of current vnode in vnode list
int32_t
subqueryInde
x
;
// index of current vnode in vnode list
SSqlObj
*
pParentSqlObj
;
tFilePage
*
localBuffer
;
// temp buffer, there is a buffer for each vnode to
uint32_t
numOfRetry
;
// record the number of retry times
...
...
src/client/inc/tscUtil.h
浏览文件 @
8de14a0c
...
...
@@ -26,11 +26,12 @@ extern "C" {
#include <limits.h>
#include <stdio.h>
#include "textbuffer.h"
#include "tscSecondaryMerge.h"
#include "tsclient.h"
#include "tsdb.h"
#include "tscSecondaryMerge.h"
#define UTIL_METER_IS_METRIC(metaInfo) (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC))
#define UTIL_METER_IS_METRIC(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC))
#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_METRIC(metaInfo)))
#define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_MTABLE))
...
...
@@ -62,28 +63,27 @@ typedef struct SJoinSubquerySupporter {
SFieldInfo
fieldsInfo
;
STagCond
tagCond
;
SSqlGroupbyExpr
groupbyExpr
;
struct
STSBuf
*
pTSBuf
;
FILE
*
f
;
char
path
[
PATH_MAX
];
struct
STSBuf
*
pTSBuf
;
// the TSBuf struct that holds the compressed timestamp array
FILE
*
f
;
// temporary file in order to create TSBuf
char
path
[
PATH_MAX
];
// temporary file path
}
SJoinSubquerySupporter
;
void
tscDestroyDataBlock
(
STableDataBlocks
*
pDataBlock
);
void
tscDestroyDataBlock
(
STableDataBlocks
*
pDataBlock
);
STableDataBlocks
*
tscCreateDataBlock
(
int32_t
size
);
void
tscAppendDataBlock
(
SDataBlockList
*
pList
,
STableDataBlocks
*
pBlocks
);
SParamInfo
*
tscAddParamToDataBlock
(
STableDataBlocks
*
pDataBlock
,
char
type
,
uint8_t
timePrec
,
short
bytes
,
uint32_t
offset
);
SDataBlockList
*
tscCreateBlockArrayList
();
void
*
tscDestroyBlockArrayList
(
SDataBlockList
*
pList
);
int32_t
tscCopyDataBlockToPayload
(
SSqlObj
*
pSql
,
STableDataBlocks
*
pDataBlock
);
void
tscFreeUnusedDataBlocks
(
SDataBlockList
*
pList
);
int32_t
tscMergeTableDataBlocks
(
SSqlObj
*
pSql
,
SDataBlockList
*
pDataList
);
void
tscAppendDataBlock
(
SDataBlockList
*
pList
,
STableDataBlocks
*
pBlocks
);
SParamInfo
*
tscAddParamToDataBlock
(
STableDataBlocks
*
pDataBlock
,
char
type
,
uint8_t
timePrec
,
short
bytes
,
uint32_t
offset
);
SDataBlockList
*
tscCreateBlockArrayList
();
void
*
tscDestroyBlockArrayList
(
SDataBlockList
*
pList
);
int32_t
tscCopyDataBlockToPayload
(
SSqlObj
*
pSql
,
STableDataBlocks
*
pDataBlock
);
void
tscFreeUnusedDataBlocks
(
SDataBlockList
*
pList
);
int32_t
tscMergeTableDataBlocks
(
SSqlObj
*
pSql
,
SDataBlockList
*
pDataList
);
STableDataBlocks
*
tscGetDataBlockFromList
(
void
*
pHashList
,
SDataBlockList
*
pDataBlockList
,
int64_t
id
,
int32_t
size
,
int32_t
startOffset
,
int32_t
rowSize
,
char
*
tableId
);
STableDataBlocks
*
tscCreateDataBlockEx
(
size_t
size
,
int32_t
rowSize
,
int32_t
startOffset
,
char
*
name
);
SVnodeSidList
*
tscGetVnodeSidList
(
SMetricMeta
*
pMetricmeta
,
int32_t
vnodeIdx
);
SVnodeSidList
*
tscGetVnodeSidList
(
SMetricMeta
*
pMetricmeta
,
int32_t
vnodeIdx
);
SMeterSidExtInfo
*
tscGetMeterSidInfo
(
SVnodeSidList
*
pSidList
,
int32_t
idx
);
/**
...
...
@@ -108,7 +108,7 @@ void tscAddSpecialColumnForSelect(SSqlCmd* pCmd, int32_t outputColIndex, int16_t
void
addRequiredTagColumn
(
SSqlCmd
*
pCmd
,
int32_t
tagColIndex
,
int32_t
tableIndex
);
int32_t
setMeterID
(
SSqlObj
*
pSql
,
SSQLToken
*
pzTableName
,
int32_t
tableIndex
);
void
tscClearInterpInfo
(
SSqlCmd
*
pCmd
);
void
tscClearInterpInfo
(
SSqlCmd
*
pCmd
);
bool
tscIsInsertOrImportData
(
char
*
sqlstr
);
...
...
@@ -128,9 +128,9 @@ void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList
void
tscFieldInfoCopyAll
(
SFieldInfo
*
src
,
SFieldInfo
*
dst
);
TAOS_FIELD
*
tscFieldInfoGetField
(
SSqlCmd
*
pCmd
,
int32_t
index
);
int16_t
tscFieldInfoGetOffset
(
SSqlCmd
*
pCmd
,
int32_t
index
);
int32_t
tscGetResRowLength
(
SSqlCmd
*
pCmd
);
void
tscClearFieldInfo
(
SFieldInfo
*
pFieldInfo
);
int16_t
tscFieldInfoGetOffset
(
SSqlCmd
*
pCmd
,
int32_t
index
);
int32_t
tscGetResRowLength
(
SSqlCmd
*
pCmd
);
void
tscClearFieldInfo
(
SFieldInfo
*
pFieldInfo
);
void
addExprParams
(
SSqlExpr
*
pExpr
,
char
*
argument
,
int32_t
type
,
int32_t
bytes
,
int16_t
tableIndex
);
...
...
@@ -142,15 +142,15 @@ SSqlExpr* tscSqlExprUpdate(SSqlCmd* pCmd, int32_t index, int16_t functionId, int
int16_t
size
);
SSqlExpr
*
tscSqlExprGet
(
SSqlCmd
*
pCmd
,
int32_t
index
);
void
tscSqlExprCopy
(
SSqlExprInfo
*
dst
,
const
SSqlExprInfo
*
src
,
uint64_t
uid
);
void
tscSqlExprCopy
(
SSqlExprInfo
*
dst
,
const
SSqlExprInfo
*
src
,
uint64_t
uid
);
SColumnBase
*
tscColumnBaseInfoInsert
(
SSqlCmd
*
pCmd
,
SColumnIndex
*
colIndex
);
void
tscColumnFilterInfoCopy
(
SColumnFilterInfo
*
dst
,
const
SColumnFilterInfo
*
src
);
void
tscColumnBaseCopy
(
SColumnBase
*
dst
,
const
SColumnBase
*
src
);
void
tscColumnFilterInfoCopy
(
SColumnFilterInfo
*
dst
,
const
SColumnFilterInfo
*
src
);
void
tscColumnBaseCopy
(
SColumnBase
*
dst
,
const
SColumnBase
*
src
);
void
tscColumnBaseInfoCopy
(
SColumnBaseInfo
*
dst
,
const
SColumnBaseInfo
*
src
,
int16_t
tableIndex
);
void
tscColumnBaseInfoCopy
(
SColumnBaseInfo
*
dst
,
const
SColumnBaseInfo
*
src
,
int16_t
tableIndex
);
SColumnBase
*
tscColumnBaseInfoGet
(
SColumnBaseInfo
*
pColumnBaseInfo
,
int32_t
index
);
void
tscColumnBaseInfoUpdateTableIndex
(
SColumnBaseInfo
*
pColList
,
int16_t
tableIndex
);
void
tscColumnBaseInfoUpdateTableIndex
(
SColumnBaseInfo
*
pColList
,
int16_t
tableIndex
);
void
tscColumnBaseInfoReserve
(
SColumnBaseInfo
*
pColumnBaseInfo
,
int32_t
size
);
void
tscColumnBaseInfoDestroy
(
SColumnBaseInfo
*
pColumnBaseInfo
);
...
...
@@ -163,7 +163,7 @@ bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId);
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
SCond
*
tsGetMetricQueryCondPos
(
STagCond
*
pCond
,
uint64_t
tableIndex
);
void
tsSetMetricQueryCond
(
STagCond
*
pTagCond
,
uint64_t
uid
,
const
char
*
str
);
void
tsSetMetricQueryCond
(
STagCond
*
pTagCond
,
uint64_t
uid
,
const
char
*
str
);
void
tscTagCondCopy
(
STagCond
*
dest
,
const
STagCond
*
src
);
void
tscTagCondRelease
(
STagCond
*
pCond
);
...
...
@@ -176,19 +176,19 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb);
void
tscCleanSqlCmd
(
SSqlCmd
*
pCmd
);
bool
tscShouldFreeAsyncSqlObj
(
SSqlObj
*
pSql
);
void
tscRemoveAllMeterMetaInfo
(
SSqlCmd
*
pCmd
,
bool
removeFromCache
);
void
tscRemoveAllMeterMetaInfo
(
SSqlCmd
*
pCmd
,
bool
removeFromCache
);
SMeterMetaInfo
*
tscGetMeterMetaInfo
(
SSqlCmd
*
pCmd
,
int32_t
index
);
SMeterMetaInfo
*
tscGetMeterMetaInfoByUid
(
SSqlCmd
*
pCmd
,
uint64_t
uid
,
int32_t
*
index
);
void
tscClearMeterMetaInfo
(
SMeterMetaInfo
*
pMeterMetaInfo
,
bool
removeFromCache
);
void
tscClearMeterMetaInfo
(
SMeterMetaInfo
*
pMeterMetaInfo
,
bool
removeFromCache
);
SMeterMetaInfo
*
tscAddMeterMetaInfo
(
SSqlCmd
*
pCmd
,
const
char
*
name
,
SMeterMeta
*
pMeterMeta
,
SMetricMeta
*
pMetricMeta
,
int16_t
numOfTags
,
int16_t
*
tags
);
SMeterMetaInfo
*
tscAddEmptyMeterMetaInfo
(
SSqlCmd
*
pCmd
);
void
tscGetMetricMetaCacheKey
(
SSqlCmd
*
pCmd
,
char
*
keyStr
,
uint64_t
uid
);
int
tscGetMetricMeta
(
SSqlObj
*
pSql
);
int
tscGetMeterMeta
(
SSqlObj
*
pSql
,
char
*
meterId
,
int32_t
tableIndex
);
int
tscGetMeterMetaEx
(
SSqlObj
*
pSql
,
char
*
meterId
,
bool
createIfNotExists
);
int
tscGetMetricMeta
(
SSqlObj
*
pSql
);
int
tscGetMeterMeta
(
SSqlObj
*
pSql
,
char
*
meterId
,
int32_t
tableIndex
);
int
tscGetMeterMetaEx
(
SSqlObj
*
pSql
,
char
*
meterId
,
bool
createIfNotExists
);
void
tscResetForNextRetrieve
(
SSqlRes
*
pRes
);
...
...
@@ -212,9 +212,8 @@ void tscDoQuery(SSqlObj* pSql);
* @param pPrevSql
* @return
*/
SSqlObj
*
createSubqueryObj
(
SSqlObj
*
pSql
,
int32_t
vnodeIndex
,
int16_t
tableIndex
,
void
(
*
fp
)(),
void
*
param
,
SSqlObj
*
pPrevSql
);
void
addGroupInfoForSubquery
(
SSqlObj
*
pParentObj
,
SSqlObj
*
pSql
,
int32_t
tableIndex
);
SSqlObj
*
createSubqueryObj
(
SSqlObj
*
pSql
,
int16_t
tableIndex
,
void
(
*
fp
)(),
void
*
param
,
SSqlObj
*
pPrevSql
);
void
addGroupInfoForSubquery
(
SSqlObj
*
pParentObj
,
SSqlObj
*
pSql
,
int32_t
tableIndex
);
void
doAddGroupColumnForSubquery
(
SSqlCmd
*
pCmd
,
int32_t
tagIndex
);
...
...
src/client/inc/tsclient.h
浏览文件 @
8de14a0c
...
...
@@ -107,22 +107,25 @@ enum _sql_cmd {
struct
SSqlInfo
;
typedef
struct
SSqlGroupbyExpr
{
int16_t
tableIndex
;
int16_t
tableIndex
;
int16_t
numOfGroupCols
;
SColIndexEx
columnInfo
[
TSDB_MAX_TAGS
];
// group by columns information
int16_t
orderIndex
;
// order by column index
int16_t
orderType
;
// order by type: asc/desc
int16_t
orderIndex
;
// order by column index
int16_t
orderType
;
// order by type: asc/desc
}
SSqlGroupbyExpr
;
typedef
struct
SMeterMetaInfo
{
SMeterMeta
*
pMeterMeta
;
// metermeta
SMetricMeta
*
pMetricMeta
;
// metricmeta
char
name
[
TSDB_METER_ID_LEN
+
1
];
int16_t
numOfTags
;
// total required tags in query, including groupby tags
int16_t
tagColumnIndex
[
TSDB_MAX_TAGS
];
// clause + tag projection
SMeterMeta
*
pMeterMeta
;
// metermeta
SMetricMeta
*
pMetricMeta
;
// metricmeta
/*
* 1. keep the vnode index during the multi-vnode super table projection query
* 2. keep the vnode index for multi-vnode insertion
*/
int32_t
vnodeIndex
;
char
name
[
TSDB_METER_ID_LEN
+
1
];
// table(super table) name
int16_t
numOfTags
;
// total required tags in query, including groupby tags
int16_t
tagColumnIndex
[
TSDB_MAX_TAGS
];
// clause + tag projection
}
SMeterMetaInfo
;
/* the structure for sql function in select clause */
...
...
@@ -188,7 +191,7 @@ typedef struct SString {
typedef
struct
SCond
{
uint64_t
uid
;
char
*
cond
;
char
*
cond
;
}
SCond
;
typedef
struct
SJoinNode
{
...
...
@@ -262,15 +265,15 @@ typedef struct SDataBlockList {
typedef
struct
{
SOrderVal
order
;
int
command
;
int
count
;
// TODO refactor
int
count
;
// TODO refactor
union
{
bool
existsCheck
;
// check if the table exists
int8_t
showType
;
// show command type
bool
existsCheck
;
// check if the table exists
int8_t
showType
;
// show command type
};
int8_t
isInsertFromFile
;
// load data from file or not
bool
import
;
// import/insert type
bool
import
;
// import/insert type
char
msgType
;
uint16_t
type
;
// query type
char
intervalTimeUnit
;
...
...
@@ -296,7 +299,6 @@ typedef struct {
SLimitVal
slimit
;
int64_t
globalLimit
;
STagCond
tagCond
;
int16_t
vnodeIdx
;
// vnode index in pMetricMeta for metric query
int16_t
interpoType
;
// interpolate type
int16_t
numOfTables
;
...
...
@@ -366,25 +368,23 @@ typedef struct _sql_obj {
STscObj
*
pTscObj
;
void
(
*
fp
)();
void
(
*
fetchFp
)();
void
*
param
;
uint32_t
ip
;
short
vnode
;
int64_t
stime
;
uint32_t
queryId
;
void
*
thandle
;
void
*
pStream
;
char
*
sqlstr
;
char
retry
;
char
maxRetry
;
char
index
;
char
freed
:
4
;
char
listed
:
4
;
tsem_t
rspSem
;
tsem_t
emptyRspSem
;
SSqlCmd
cmd
;
SSqlRes
res
;
void
*
param
;
uint32_t
ip
;
short
vnode
;
int64_t
stime
;
uint32_t
queryId
;
void
*
thandle
;
void
*
pStream
;
char
*
sqlstr
;
char
retry
;
char
maxRetry
;
char
index
;
char
freed
:
4
;
char
listed
:
4
;
tsem_t
rspSem
;
tsem_t
emptyRspSem
;
SSqlCmd
cmd
;
SSqlRes
res
;
char
numOfSubs
;
struct
_sql_obj
**
pSubs
;
struct
_sql_obj
*
prev
,
*
next
;
...
...
src/client/src/tscAsync.c
浏览文件 @
8de14a0c
...
...
@@ -121,7 +121,8 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf
// sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx
if
(
numOfRows
==
0
&&
tscProjectionQueryOnMetric
(
pCmd
))
{
// vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx
assert
(
pCmd
->
vnodeIdx
>=
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
assert
(
pMeterMetaInfo
->
vnodeIndex
>=
0
);
/* reach the maximum number of output rows, abort */
if
(
pCmd
->
globalLimit
>
0
&&
pRes
->
numOfTotal
>=
pCmd
->
globalLimit
)
{
...
...
@@ -133,8 +134,8 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf
pCmd
->
limit
.
limit
=
pCmd
->
globalLimit
-
pRes
->
numOfTotal
;
pCmd
->
limit
.
offset
=
pRes
->
offset
;
if
((
++
(
p
Cmd
->
vnodeIdx
))
<
tscGetMeterMetaInfo
(
pCmd
,
0
)
->
pMetricMeta
->
numOfVnodes
)
{
tscTrace
(
"%p retrieve data from next vnode:%d"
,
pSql
,
p
Cmd
->
vnodeId
x
);
if
((
++
(
p
MeterMetaInfo
->
vnodeIndex
))
<
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
tscTrace
(
"%p retrieve data from next vnode:%d"
,
pSql
,
p
MeterMetaInfo
->
vnodeInde
x
);
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
// reset flag to launch query first.
...
...
@@ -272,7 +273,8 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) {
/*
* vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx till all vnode have been retrieved
*/
assert
(
pCmd
->
vnodeIdx
>=
1
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
assert
(
pMeterMetaInfo
->
vnodeIndex
>=
0
);
/* reach the maximum number of output rows, abort */
if
(
pCmd
->
globalLimit
>
0
&&
pRes
->
numOfTotal
>=
pCmd
->
globalLimit
)
{
...
...
@@ -283,7 +285,7 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) {
/* update the limit value according to current retrieval results */
pCmd
->
limit
.
limit
=
pCmd
->
globalLimit
-
pRes
->
numOfTotal
;
if
((
++
p
Cmd
->
vnodeIdx
)
<=
tscGetMeterMetaInfo
(
pCmd
,
0
)
->
pMetricMeta
->
numOfVnodes
)
{
if
((
++
p
MeterMetaInfo
->
vnodeIndex
)
<=
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
// reset flag to launch query first.
tscResetForNextRetrieve
(
pRes
);
...
...
@@ -404,9 +406,12 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows)
int32_t
code
=
TSDB_CODE_SUCCESS
;
assert
(
!
pCmd
->
isInsertFromFile
&&
pSql
->
signature
==
pSql
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
assert
(
pCmd
->
numOfTables
==
1
);
SDataBlockList
*
pDataBlocks
=
pCmd
->
pDataBlocks
;
if
(
pDataBlocks
==
NULL
||
p
Cmd
->
vnodeId
x
>=
pDataBlocks
->
nSize
)
{
if
(
pDataBlocks
==
NULL
||
p
MeterMetaInfo
->
vnodeInde
x
>=
pDataBlocks
->
nSize
)
{
// restore user defined fp
pSql
->
fp
=
pSql
->
fetchFp
;
tscTrace
(
"%p Async insertion completed, destroy data block list"
,
pSql
);
...
...
@@ -418,17 +423,17 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows)
(
*
pSql
->
fp
)(
pSql
->
param
,
tres
,
numOfRows
);
}
else
{
do
{
code
=
tscCopyDataBlockToPayload
(
pSql
,
pDataBlocks
->
pData
[
p
Cmd
->
vnodeId
x
++
]);
code
=
tscCopyDataBlockToPayload
(
pSql
,
pDataBlocks
->
pData
[
p
MeterMetaInfo
->
vnodeInde
x
++
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscTrace
(
"%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d"
,
pSql
,
p
Cmd
->
vnodeId
x
-
1
,
pDataBlocks
->
nSize
,
code
);
pSql
,
p
MeterMetaInfo
->
vnodeInde
x
-
1
,
pDataBlocks
->
nSize
,
code
);
}
}
while
(
code
!=
TSDB_CODE_SUCCESS
&&
p
Cmd
->
vnodeId
x
<
pDataBlocks
->
nSize
);
}
while
(
code
!=
TSDB_CODE_SUCCESS
&&
p
MeterMetaInfo
->
vnodeInde
x
<
pDataBlocks
->
nSize
);
// build submit msg may fail
if
(
code
==
TSDB_CODE_SUCCESS
)
{
tscTrace
(
"%p async insertion, vnodeIdx:%d, total:%d"
,
pSql
,
p
Cmd
->
vnodeId
x
-
1
,
pDataBlocks
->
nSize
);
tscTrace
(
"%p async insertion, vnodeIdx:%d, total:%d"
,
pSql
,
p
MeterMetaInfo
->
vnodeInde
x
-
1
,
pDataBlocks
->
nSize
);
tscProcessSql
(
pSql
);
}
}
...
...
@@ -484,11 +489,11 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
// check if it is a sub-query of metric query first, if true, enter another routine
if
((
pSql
->
cmd
.
type
&
TSDB_QUERY_TYPE_STABLE_SUBQUERY
)
==
TSDB_QUERY_TYPE_STABLE_SUBQUERY
)
{
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
assert
(
pMeterMetaInfo
->
pMeterMeta
->
numOfTags
!=
0
&&
p
Cmd
->
vnodeId
x
>=
0
&&
pSql
->
param
!=
NULL
);
assert
(
pMeterMetaInfo
->
pMeterMeta
->
numOfTags
!=
0
&&
p
MeterMetaInfo
->
vnodeInde
x
>=
0
&&
pSql
->
param
!=
NULL
);
SRetrieveSupport
*
trs
=
(
SRetrieveSupport
*
)
pSql
->
param
;
SSqlObj
*
pParObj
=
trs
->
pParentSqlObj
;
assert
(
pParObj
->
signature
==
pParObj
&&
trs
->
vnodeIdx
==
pCmd
->
vnodeId
x
&&
assert
(
pParObj
->
signature
==
pParObj
&&
trs
->
subqueryIndex
==
pMeterMetaInfo
->
vnodeInde
x
&&
pMeterMetaInfo
->
pMeterMeta
->
numOfTags
!=
0
);
tscTrace
(
"%p get metricMeta during metric query successfully"
,
pSql
);
...
...
src/client/src/tscJoinProcess.c
浏览文件 @
8de14a0c
...
...
@@ -150,7 +150,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
tsBufDestory
(
pSupporter1
->
pTSBuf
);
tsBufDestory
(
pSupporter2
->
pTSBuf
);
tscTrace
(
"%p input1:%lld, input2:%lld, %lld for secondary query after ts blocks intersecting"
,
tscTrace
(
"%p input1:%lld, input2:%lld,
final:
%lld for secondary query after ts blocks intersecting"
,
pSql
,
numOfInput1
,
numOfInput2
,
output1
->
numOfTotal
);
return
output1
->
numOfTotal
;
...
...
@@ -239,15 +239,20 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
pSupporter
=
pSql
->
pSubs
[
i
]
->
param
;
pSupporter
->
pState
->
numOfCompleted
=
0
;
/*
* If the columns are not involved in the final select clause, the secondary query will not be launched
* for the subquery.
*/
if
(
pSupporter
->
exprsInfo
.
numOfExprs
>
0
)
{
++
numOfSub
;
}
}
// scan all subquery, if one sub query has only ts, ignore it
int32_t
j
=
0
;
tscTrace
(
"%p start to launch secondary subqueries: %d"
,
pSql
,
pSql
->
numOfSubs
);
tscTrace
(
"%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in "
"select clause"
,
pSql
,
pSql
->
numOfSubs
,
numOfSub
);
int32_t
j
=
0
;
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
SSqlObj
*
pSub
=
pSql
->
pSubs
[
i
];
pSupporter
=
pSub
->
param
;
...
...
@@ -259,15 +264,14 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
continue
;
}
SSqlObj
*
pNew
=
createSubqueryObj
(
pSql
,
0
,
(
int16_t
)
i
,
tscJoinQueryCallback
,
pSupporter
,
NULL
);
SSqlObj
*
pNew
=
createSubqueryObj
(
pSql
,
(
int16_t
)
i
,
tscJoinQueryCallback
,
pSupporter
,
NULL
);
if
(
pNew
==
NULL
)
{
pSql
->
numOfSubs
=
i
;
//revise the number of subquery
pSupporter
->
pState
->
numOfTotal
=
i
;
pSupporter
->
pState
->
code
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
tscDestroyJoinSupporter
(
pSupporter
);
return
NULL
;
return
0
;
}
tscFreeSqlCmdData
(
&
pNew
->
cmd
);
...
...
@@ -386,8 +390,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
if
(
numOfRows
>
0
)
{
// write the data into disk
fwrite
(
pSql
->
res
.
data
,
pSql
->
res
.
numOfRows
,
1
,
pSupporter
->
f
);
f
flush
(
pSupporter
->
f
);
f
close
(
pSupporter
->
f
);
STSBuf
*
pBuf
=
tsBufCreateFromFile
(
pSupporter
->
path
,
true
);
if
(
pBuf
==
NULL
)
{
tscError
(
"%p invalid ts comp file from vnode, abort sub query, file size:%d"
,
pSql
,
numOfRows
);
...
...
@@ -401,7 +405,10 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
tscTrace
(
"%p create tmp file for ts block:%s"
,
pSql
,
pBuf
->
path
);
pSupporter
->
pTSBuf
=
pBuf
;
}
else
{
tsBufMerge
(
pSupporter
->
pTSBuf
,
pBuf
,
pSql
->
cmd
.
vnodeIdx
);
assert
(
pSql
->
cmd
.
numOfTables
==
1
);
// for subquery, only one metermetaInfo
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
);
tsBufMerge
(
pSupporter
->
pTSBuf
,
pBuf
,
pMeterMetaInfo
->
vnodeIndex
);
tsBufDestory
(
pBuf
);
}
...
...
@@ -412,6 +419,20 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
taos_fetch_rows_a
(
tres
,
joinRetrieveCallback
,
param
);
}
else
if
(
numOfRows
==
0
)
{
// no data from this vnode anymore
if
(
tscProjectionQueryOnMetric
(
&
pParentSql
->
cmd
))
{
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
);
assert
(
pSql
->
cmd
.
numOfTables
==
1
);
// for projection query, need to try next vnode
if
((
++
pMeterMetaInfo
->
vnodeIndex
)
<
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
pSql
->
fp
=
tscJoinQueryCallback
;
tscProcessSql
(
pSql
);
return
;
}
}
if
(
atomic_add_fetch_32
(
&
pSupporter
->
pState
->
numOfCompleted
,
1
)
>=
pSupporter
->
pState
->
numOfTotal
)
{
if
(
pSupporter
->
pState
->
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -466,6 +487,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
void
tscFetchDatablockFromSubquery
(
SSqlObj
*
pSql
)
{
int32_t
numOfFetch
=
0
;
assert
(
pSql
->
numOfSubs
>=
1
);
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
SJoinSubquerySupporter
*
pSupporter
=
(
SJoinSubquerySupporter
*
)
pSql
->
pSubs
[
i
]
->
param
;
...
...
@@ -731,7 +754,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
strncpy
(
pTSBuf
->
path
,
path
,
PATH_MAX
);
pTSBuf
->
f
=
fopen
(
pTSBuf
->
path
,
"r"
);
pTSBuf
->
f
=
fopen
(
pTSBuf
->
path
,
"r
+
"
);
if
(
pTSBuf
->
f
==
NULL
)
{
return
NULL
;
}
...
...
@@ -797,6 +820,10 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
pTSBuf
->
cur
.
order
=
TSQL_SO_ASC
;
pTSBuf
->
autoDelete
=
autoDelete
;
tscTrace
(
"create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d"
,
pTSBuf
->
path
,
fileno
(
pTSBuf
->
f
),
pTSBuf
->
fileSize
,
pTSBuf
->
numOfVnodes
,
pTSBuf
->
autoDelete
);
return
pTSBuf
;
}
...
...
@@ -814,10 +841,21 @@ void tsBufDestory(STSBuf* pTSBuf) {
fclose
(
pTSBuf
->
f
);
if
(
pTSBuf
->
autoDelete
)
{
tscTrace
(
"tsBuf %p destroyed, delete tmp file:%s"
,
pTSBuf
,
pTSBuf
->
path
);
unlink
(
pTSBuf
->
path
);
}
else
{
tscTrace
(
"tsBuf %p destroyed, tmp file:%s, remains"
,
pTSBuf
,
pTSBuf
->
path
);
}
free
(
pTSBuf
);
}
static
STSVnodeBlockInfoEx
*
tsBufGetLastVnodeInfo
(
STSBuf
*
pTSBuf
)
{
int32_t
last
=
pTSBuf
->
numOfVnodes
-
1
;
assert
(
last
>=
0
);
return
&
pTSBuf
->
pData
[
last
];
}
static
STSVnodeBlockInfoEx
*
addOneVnodeInfo
(
STSBuf
*
pTSBuf
,
int32_t
vnodeId
)
{
...
...
@@ -836,10 +874,10 @@ static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) {
}
if
(
pTSBuf
->
numOfVnodes
>
0
)
{
STSVnodeBlockInfo
*
pPrevBlockInfo
=
&
pTSBuf
->
pData
[
pTSBuf
->
numOfVnodes
-
1
].
info
;
STSVnodeBlockInfo
Ex
*
pPrevBlockInfoEx
=
tsBufGetLastVnodeInfo
(
pTSBuf
)
;
// update prev vnode length info in file
TSBufUpdateVnodeInfo
(
pTSBuf
,
pTSBuf
->
numOfVnodes
-
1
,
pPrevBlockI
nfo
);
TSBufUpdateVnodeInfo
(
pTSBuf
,
pTSBuf
->
numOfVnodes
-
1
,
&
pPrevBlockInfoEx
->
i
nfo
);
}
// set initial value for vnode block
...
...
@@ -855,11 +893,11 @@ static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) {
pTSBuf
->
numOfVnodes
+=
1
;
// update the header info
STSBufFileHeader
header
=
{
.
magic
=
TS_COMP_FILE_MAGIC
,
.
numOfVnode
=
pTSBuf
->
numOfVnodes
,
.
tsOrder
=
pTSBuf
->
tsOrder
};
STSBufFileHeader
header
=
{.
magic
=
TS_COMP_FILE_MAGIC
,
.
numOfVnode
=
pTSBuf
->
numOfVnodes
,
.
tsOrder
=
pTSBuf
->
tsOrder
};
STSBufUpdateHeader
(
pTSBuf
,
&
header
);
return
&
pTSBuf
->
pData
[
pTSBuf
->
numOfVnodes
-
1
];
return
tsBufGetLastVnodeInfo
(
pTSBuf
);
}
static
void
shrinkBuffer
(
STSList
*
ptsData
)
{
...
...
@@ -905,9 +943,11 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
pTSBuf
->
fileSize
+=
blockSize
;
pTSBuf
->
tsData
.
len
=
0
;
pTSBuf
->
pData
[
pTSBuf
->
numOfVnodes
-
1
].
info
.
compLen
+=
blockSize
;
pTSBuf
->
pData
[
pTSBuf
->
numOfVnodes
-
1
].
info
.
numOfBlocks
+=
1
;
STSVnodeBlockInfoEx
*
pVnodeBlockInfoEx
=
tsBufGetLastVnodeInfo
(
pTSBuf
);
pVnodeBlockInfoEx
->
info
.
compLen
+=
blockSize
;
pVnodeBlockInfoEx
->
info
.
numOfBlocks
+=
1
;
shrinkBuffer
(
&
pTSBuf
->
tsData
);
}
...
...
@@ -1008,13 +1048,13 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData
STSVnodeBlockInfoEx
*
pBlockInfo
=
NULL
;
STSList
*
ptsData
=
&
pTSBuf
->
tsData
;
if
(
pTSBuf
->
numOfVnodes
==
0
||
pTSBuf
->
pData
[
pTSBuf
->
numOfVnodes
-
1
].
info
.
vnode
!=
vnodeId
)
{
if
(
pTSBuf
->
numOfVnodes
==
0
||
tsBufGetLastVnodeInfo
(
pTSBuf
)
->
info
.
vnode
!=
vnodeId
)
{
writeDataToDisk
(
pTSBuf
);
shrinkBuffer
(
ptsData
);
pBlockInfo
=
addOneVnodeInfo
(
pTSBuf
,
vnodeId
);
}
else
{
pBlockInfo
=
&
pTSBuf
->
pData
[
pTSBuf
->
numOfVnodes
-
1
]
;
pBlockInfo
=
tsBufGetLastVnodeInfo
(
pTSBuf
)
;
}
assert
(
pBlockInfo
->
info
.
vnode
==
vnodeId
);
...
...
@@ -1037,6 +1077,8 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData
pTSBuf
->
numOfTotal
+=
len
/
TSDB_KEYSIZE
;
// the size of raw data exceeds the size of the default prepared buffer, so
// during getBufBlock, the output buffer needs to be large enough.
if
(
ptsData
->
len
>=
ptsData
->
threshold
)
{
writeDataToDisk
(
pTSBuf
);
shrinkBuffer
(
ptsData
);
...
...
@@ -1053,10 +1095,10 @@ void tsBufFlush(STSBuf* pTSBuf) {
writeDataToDisk
(
pTSBuf
);
shrinkBuffer
(
&
pTSBuf
->
tsData
);
STSVnodeBlockInfo
*
pBlockInfo
=
&
pTSBuf
->
pData
[
pTSBuf
->
numOfVnodes
-
1
].
info
;
STSVnodeBlockInfo
Ex
*
pBlockInfoEx
=
tsBufGetLastVnodeInfo
(
pTSBuf
)
;
// update prev vnode length info in file
TSBufUpdateVnodeInfo
(
pTSBuf
,
pTSBuf
->
numOfVnodes
-
1
,
pBlockI
nfo
);
TSBufUpdateVnodeInfo
(
pTSBuf
,
pTSBuf
->
numOfVnodes
-
1
,
&
pBlockInfoEx
->
i
nfo
);
// save the ts order into header
STSBufFileHeader
header
=
{
...
...
@@ -1157,11 +1199,22 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex
}
STSBlock
*
pBlock
=
&
pTSBuf
->
block
;
size_t
s
=
pBlock
->
numOfElem
*
TSDB_KEYSIZE
;
/*
* In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value
* may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function
*/
if
(
s
>
pTSBuf
->
tsData
.
allocSize
)
{
expandBuffer
(
&
pTSBuf
->
tsData
,
s
);
}
pTSBuf
->
tsData
.
len
=
tsDecompressTimestamp
(
pBlock
->
payload
,
pBlock
->
compLen
,
pBlock
->
numOfElem
,
pTSBuf
->
tsData
.
rawBuf
,
pTSBuf
->
tsData
.
allocSize
,
TWO_STAGE_COMP
,
pTSBuf
->
assistBuf
,
pTSBuf
->
bufSize
);
assert
(
pTSBuf
->
tsData
.
len
/
TSDB_KEYSIZE
==
pBlock
->
numOfElem
);
assert
(
(
pTSBuf
->
tsData
.
len
/
TSDB_KEYSIZE
==
pBlock
->
numOfElem
)
&&
(
pTSBuf
->
tsData
.
allocSize
>=
pTSBuf
->
tsData
.
len
)
);
pCur
->
vnodeIndex
=
vnodeIndex
;
pCur
->
blockIndex
=
blockIndex
;
...
...
@@ -1293,6 +1346,8 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
return
elem1
;
}
/**
* current only support ts comp data from two vnode merge
* @param pDestBuf
...
...
@@ -1318,7 +1373,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
tsBufFlush
(
pDestBuf
);
// compared with the last vnode id
if
(
vnodeId
!=
pDestBuf
->
pData
[
pDestBuf
->
numOfVnodes
-
1
].
info
.
vnode
)
{
if
(
vnodeId
!=
tsBufGetLastVnodeInfo
(
pDestBuf
)
->
info
.
vnode
)
{
int32_t
oldSize
=
pDestBuf
->
numOfVnodes
;
int32_t
newSize
=
oldSize
+
pSrcBuf
->
numOfVnodes
;
...
...
@@ -1345,36 +1400,49 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
pDestBuf
->
numOfVnodes
=
newSize
;
}
else
{
STSVnodeBlockInfoEx
*
pBlockInfoEx
=
&
pDestBuf
->
pData
[
pDestBuf
->
numOfVnodes
-
1
];
STSVnodeBlockInfoEx
*
pBlockInfoEx
=
tsBufGetLastVnodeInfo
(
pDestBuf
);
pBlockInfoEx
->
len
+=
pSrcBuf
->
pData
[
0
].
len
;
pBlockInfoEx
->
info
.
numOfBlocks
+=
pSrcBuf
->
pData
[
0
].
info
.
numOfBlocks
;
pBlockInfoEx
->
info
.
compLen
+=
pSrcBuf
->
pData
[
0
].
info
.
compLen
;
pBlockInfoEx
->
info
.
vnode
=
vnodeId
;
}
int
64
_t
r
=
fseek
(
pDestBuf
->
f
,
0
,
SEEK_END
);
int
32
_t
r
=
fseek
(
pDestBuf
->
f
,
0
,
SEEK_END
);
assert
(
r
==
0
);
int64_t
offset
=
getDataStartOffset
();
int32_t
size
=
pSrcBuf
->
fileSize
-
offset
;
#ifdef LINUX
ssize_t
rc
=
sendfile
(
fileno
(
pDestBuf
->
f
),
fileno
(
pSrcBuf
->
f
),
&
offset
,
size
);
ssize_t
rc
=
t
sendfile
(
fileno
(
pDestBuf
->
f
),
fileno
(
pSrcBuf
->
f
),
&
offset
,
size
);
#else
ssize_t
rc
=
fsendfile
(
pDestBuf
->
f
,
pSrcBuf
->
f
,
&
offset
,
size
);
#endif
if
(
rc
==
-
1
)
{
printf
(
"%s
\n
"
,
strerror
(
errno
));
tscError
(
"failed to merge tsBuf from:%s to %s, reason:%s
\n
"
,
pSrcBuf
->
path
,
pDestBuf
->
path
,
strerror
(
errno
));
return
-
1
;
}
if
(
rc
!=
size
)
{
printf
(
"%s
\n
"
,
strerror
(
errno
));
tscError
(
"failed to merge tsBuf from:%s to %s, reason:%s
\n
"
,
pSrcBuf
->
path
,
pDestBuf
->
path
,
strerror
(
errno
));
return
-
1
;
}
pDestBuf
->
numOfTotal
+=
pSrcBuf
->
numOfTotal
;
int32_t
oldSize
=
pDestBuf
->
fileSize
;
struct
stat
fileStat
;
fstat
(
fileno
(
pDestBuf
->
f
),
&
fileStat
);
pDestBuf
->
fileSize
=
(
uint32_t
)
fileStat
.
st_size
;
assert
(
pDestBuf
->
fileSize
==
oldSize
+
size
);
tscTrace
(
"tsBuf merge success, %p, path:%s, fd:%d, file size:%d, vnode:%d, autoDelete:%d"
,
pDestBuf
,
pDestBuf
->
path
,
fileno
(
pDestBuf
->
f
),
pDestBuf
->
fileSize
,
pDestBuf
->
numOfVnodes
,
pDestBuf
->
autoDelete
);
return
0
;
}
...
...
src/client/src/tscParseInsert.c
浏览文件 @
8de14a0c
...
...
@@ -498,10 +498,11 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMe
*
str
+=
index
;
if
(
numOfRows
>=
maxRows
||
pDataBlock
->
size
+
pMeterMeta
->
rowSize
>=
pDataBlock
->
nAllocSize
)
{
int32_t
tSize
=
tscAllocateMemIfNeed
(
pDataBlock
,
pMeterMeta
->
rowSize
);
if
(
0
==
tSize
)
{
if
(
0
==
tSize
)
{
//TODO pass the correct error code to client
strcpy
(
error
,
"client out of memory"
);
return
-
1
;
}
maxRows
+=
tSize
;
}
...
...
@@ -1060,8 +1061,10 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
goto
_error_clean
;
}
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
// set the next sent data vnode index in data block arraylist
p
Cmd
->
vnodeId
x
=
1
;
p
MeterMetaInfo
->
vnodeInde
x
=
1
;
}
else
{
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
}
...
...
@@ -1279,19 +1282,19 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql) {
int32_t
code
=
TSDB_CODE_SUCCESS
;
/* the first block has been sent to server in processSQL function */
assert
(
pCmd
->
isInsertFromFile
!=
-
1
&&
p
Cmd
->
vnodeId
x
>=
1
&&
pCmd
->
pDataBlocks
!=
NULL
);
assert
(
pCmd
->
isInsertFromFile
!=
-
1
&&
p
MeterMetaInfo
->
vnodeInde
x
>=
1
&&
pCmd
->
pDataBlocks
!=
NULL
);
if
(
p
Cmd
->
vnodeId
x
<
pCmd
->
pDataBlocks
->
nSize
)
{
if
(
p
MeterMetaInfo
->
vnodeInde
x
<
pCmd
->
pDataBlocks
->
nSize
)
{
SDataBlockList
*
pDataBlocks
=
pCmd
->
pDataBlocks
;
for
(
int32_t
i
=
p
Cmd
->
vnodeId
x
;
i
<
pDataBlocks
->
nSize
;
++
i
)
{
for
(
int32_t
i
=
p
MeterMetaInfo
->
vnodeInde
x
;
i
<
pDataBlocks
->
nSize
;
++
i
)
{
pDataBlock
=
pDataBlocks
->
pData
[
i
];
if
(
pDataBlock
==
NULL
)
{
continue
;
}
if
((
code
=
tscCopyDataBlockToPayload
(
pSql
,
pDataBlock
))
!=
TSDB_CODE_SUCCESS
)
{
tscTrace
(
"%p build submit data block failed, vnodeIdx:%d, total:%d"
,
pSql
,
p
Cmd
->
vnodeId
x
,
pDataBlocks
->
nSize
);
tscTrace
(
"%p build submit data block failed, vnodeIdx:%d, total:%d"
,
pSql
,
p
MeterMetaInfo
->
vnodeInde
x
,
pDataBlocks
->
nSize
);
continue
;
}
...
...
src/client/src/tscPrepare.c
浏览文件 @
8de14a0c
...
...
@@ -409,7 +409,9 @@ static int insertStmtReset(STscStmt* pStmt) {
}
}
pCmd
->
batchSize
=
0
;
pCmd
->
vnodeIdx
=
0
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
pMeterMetaInfo
->
vnodeIndex
=
0
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -422,6 +424,8 @@ static int insertStmtExecute(STscStmt* stmt) {
++
pCmd
->
batchSize
;
}
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
if
(
pCmd
->
pDataBlocks
->
nSize
>
0
)
{
// merge according to vgid
int
code
=
tscMergeTableDataBlocks
(
stmt
->
pSql
,
pCmd
->
pDataBlocks
);
...
...
@@ -436,7 +440,7 @@ static int insertStmtExecute(STscStmt* stmt) {
}
// set the next sent data vnode index in data block arraylist
p
Cmd
->
vnodeId
x
=
1
;
p
MeterMetaInfo
->
vnodeInde
x
=
1
;
}
else
{
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
}
...
...
src/client/src/tscServer.c
浏览文件 @
8de14a0c
...
...
@@ -222,7 +222,7 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
if
(
UTIL_METER_IS_METRIC
(
pMeterMetaInfo
))
{
// multiple vnode query
SVnodeSidList
*
vnodeList
=
tscGetVnodeSidList
(
pMeterMetaInfo
->
pMetricMeta
,
p
Cmd
->
vnodeId
x
);
SVnodeSidList
*
vnodeList
=
tscGetVnodeSidList
(
pMeterMetaInfo
->
pMetricMeta
,
p
MeterMetaInfo
->
vnodeInde
x
);
if
(
vnodeList
!=
NULL
)
{
pVPeersDesc
=
vnodeList
->
vpeerDesc
;
}
...
...
@@ -528,7 +528,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
if
(
pMeterMetaInfo
->
pMeterMeta
)
// it may be deleted
pMeterMetaInfo
->
pMeterMeta
->
index
=
pSql
->
index
;
}
else
{
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMeterMetaInfo
->
pMetricMeta
,
p
Sql
->
cmd
.
vnodeId
x
);
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMeterMetaInfo
->
pMetricMeta
,
p
MeterMetaInfo
->
vnodeInde
x
);
pVnodeSidList
->
index
=
pSql
->
index
;
}
}
else
{
...
...
@@ -639,7 +639,7 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu
static
int
tscLaunchMetricSubQueries
(
SSqlObj
*
pSql
);
// todo merge with callback
int32_t
tscLaunchJoinSubquery
(
SSqlObj
*
pSql
,
int16_t
tableIndex
,
int16_t
vnodeIdx
,
SJoinSubquerySupporter
*
pSupporter
)
{
int32_t
tscLaunchJoinSubquery
(
SSqlObj
*
pSql
,
int16_t
tableIndex
,
SJoinSubquerySupporter
*
pSupporter
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pSql
->
res
.
qhandle
=
0x1
;
...
...
@@ -652,12 +652,13 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, int16_t vnodeId
}
}
SSqlObj
*
pNew
=
createSubqueryObj
(
pSql
,
vnodeIdx
,
tableIndex
,
tscJoinQueryCallback
,
pSupporter
,
NULL
);
SSqlObj
*
pNew
=
createSubqueryObj
(
pSql
,
tableIndex
,
tscJoinQueryCallback
,
pSupporter
,
NULL
);
if
(
pNew
==
NULL
)
{
return
TSDB_CODE_CLI_OUT_OF_MEMORY
;
}
pSql
->
pSubs
[
pSql
->
numOfSubs
++
]
=
pNew
;
assert
(
pSql
->
numOfSubs
<=
pSupporter
->
pState
->
numOfTotal
);
if
(
QUERY_IS_JOIN_QUERY
(
pCmd
->
type
))
{
addGroupInfoForSubquery
(
pSql
,
pNew
,
tableIndex
);
...
...
@@ -774,7 +775,7 @@ int tscProcessSql(SSqlObj *pSql) {
pSql
->
index
=
pMeterMetaInfo
->
pMeterMeta
->
index
;
}
else
{
// it must be the parent SSqlObj for super table query
if
((
pSql
->
cmd
.
type
&
TSDB_QUERY_TYPE_SUBQUERY
)
!=
0
)
{
int32_t
idx
=
p
Sql
->
cmd
.
vnodeId
x
;
int32_t
idx
=
p
MeterMetaInfo
->
vnodeInde
x
;
SVnodeSidList
*
pSidList
=
tscGetVnodeSidList
(
pMeterMetaInfo
->
pMetricMeta
,
idx
);
pSql
->
index
=
pSidList
->
index
;
}
...
...
@@ -802,7 +803,7 @@ int tscProcessSql(SSqlObj *pSql) {
return
pSql
->
res
.
code
;
}
int32_t
code
=
tscLaunchJoinSubquery
(
pSql
,
i
,
0
,
pSupporter
);
int32_t
code
=
tscLaunchJoinSubquery
(
pSql
,
i
,
pSupporter
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// failed to create subquery object, quit query
tscDestroyJoinSupporter
(
pSupporter
);
pSql
->
res
.
code
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
...
...
@@ -944,7 +945,7 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) {
trs
->
pOrderDescriptor
=
pDesc
;
trs
->
pState
=
pState
;
trs
->
localBuffer
=
(
tFilePage
*
)
calloc
(
1
,
nBufferSize
+
sizeof
(
tFilePage
));
trs
->
vnodeId
x
=
i
;
trs
->
subqueryInde
x
=
i
;
trs
->
pParentSqlObj
=
pSql
;
trs
->
pFinalColModel
=
pModel
;
...
...
@@ -971,7 +972,7 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) {
pNew
->
cmd
.
tsBuf
=
tsBufClone
(
pSql
->
cmd
.
tsBuf
);
}
tscTrace
(
"%p sub:%p launch subquery.orderOfSub:%d"
,
pSql
,
pNew
,
pNew
->
cmd
.
vnodeId
x
);
tscTrace
(
"%p sub:%p launch subquery.orderOfSub:%d"
,
pSql
,
pNew
,
trs
->
subqueryInde
x
);
tscProcessSql
(
pNew
);
}
...
...
@@ -1020,7 +1021,7 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
static
void
tscHandleSubRetrievalError
(
SRetrieveSupport
*
trsupport
,
SSqlObj
*
pSql
,
int
numOfRows
)
{
SSqlObj
*
pPObj
=
trsupport
->
pParentSqlObj
;
int32_t
idx
=
trsupport
->
vnodeId
x
;
int32_t
subqueryIndex
=
trsupport
->
subqueryInde
x
;
assert
(
pSql
!=
NULL
);
...
...
@@ -1035,27 +1036,27 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
pSql
->
res
.
numOfRows
=
0
;
trsupport
->
numOfRetry
=
MAX_NUM_OF_SUBQUERY_RETRY
;
// disable retry efforts
tscTrace
(
"%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d"
,
trsupport
->
pParentSqlObj
,
pSql
,
trsupport
->
vnodeId
x
,
trsupport
->
pState
->
code
);
subqueryInde
x
,
trsupport
->
pState
->
code
);
}
if
(
numOfRows
>=
0
)
{
// current query is successful, but other sub query failed, still abort current query.
tscTrace
(
"%p sub:%p retrieve numOfRows:%d,orderOfSub:%d"
,
pPObj
,
pSql
,
numOfRows
,
id
x
);
tscError
(
"%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d"
,
pPObj
,
pSql
,
idx
,
trsupport
->
pState
->
code
);
tscTrace
(
"%p sub:%p retrieve numOfRows:%d,orderOfSub:%d"
,
pPObj
,
pSql
,
numOfRows
,
subqueryInde
x
);
tscError
(
"%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d"
,
pPObj
,
pSql
,
subqueryIndex
,
trsupport
->
pState
->
code
);
}
else
{
if
(
trsupport
->
numOfRetry
++
<
MAX_NUM_OF_SUBQUERY_RETRY
&&
trsupport
->
pState
->
code
==
TSDB_CODE_SUCCESS
)
{
/*
* current query failed, and the retry count is less than the available
* count, retry query clear previous retrieved data, then launch a new sub query
*/
tExtMemBufferClear
(
trsupport
->
pExtMemBuffer
[
id
x
]);
tExtMemBufferClear
(
trsupport
->
pExtMemBuffer
[
subqueryInde
x
]);
// clear local saved number of results
trsupport
->
localBuffer
->
numOfElems
=
0
;
pthread_mutex_unlock
(
&
trsupport
->
queryMutex
);
tscTrace
(
"%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d"
,
trsupport
->
pParentSqlObj
,
pSql
,
numOfRows
,
id
x
,
trsupport
->
numOfRetry
);
subqueryInde
x
,
trsupport
->
numOfRetry
);
SSqlObj
*
pNew
=
tscCreateSqlObjForSubquery
(
trsupport
->
pParentSqlObj
,
trsupport
,
pSql
);
if
(
pNew
==
NULL
)
{
...
...
@@ -1072,7 +1073,7 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
}
else
{
// reach the maximum retry count, abort
atomic_val_compare_exchange_32
(
&
trsupport
->
pState
->
code
,
TSDB_CODE_SUCCESS
,
numOfRows
);
tscError
(
"%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d"
,
pPObj
,
pSql
,
numOfRows
,
id
x
,
trsupport
->
pState
->
code
);
numOfRows
,
subqueryInde
x
,
trsupport
->
pState
->
code
);
}
}
...
...
@@ -1115,13 +1116,12 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
void
tscRetrieveFromVnodeCallBack
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
)
{
SRetrieveSupport
*
trsupport
=
(
SRetrieveSupport
*
)
param
;
int32_t
idx
=
trsupport
->
vnodeId
x
;
int32_t
idx
=
trsupport
->
subqueryInde
x
;
SSqlObj
*
pPObj
=
trsupport
->
pParentSqlObj
;
tOrderDescriptor
*
pDesc
=
trsupport
->
pOrderDescriptor
;
SSqlObj
*
pSql
=
(
SSqlObj
*
)
tres
;
if
(
pSql
==
NULL
)
{
/* sql object has been released in error process, return immediately */
if
(
pSql
==
NULL
)
{
// sql object has been released in error process, return immediately
tscTrace
(
"%p subquery has been released, idx:%d, abort"
,
pPObj
,
idx
);
return
;
}
...
...
@@ -1172,7 +1172,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
}
else
{
// all data has been retrieved to client
/* data in from current vnode is stored in cache and disk */
uint32_t
numOfRowsFromVnode
=
trsupport
->
pExtMemBuffer
[
pCmd
->
vnodeI
dx
]
->
numOfAllElems
+
trsupport
->
localBuffer
->
numOfElems
;
trsupport
->
pExtMemBuffer
[
i
dx
]
->
numOfAllElems
+
trsupport
->
localBuffer
->
numOfElems
;
tscTrace
(
"%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d"
,
pPObj
,
pSql
,
pSvd
->
ip
,
pSvd
->
vnode
,
numOfRowsFromVnode
,
idx
);
...
...
@@ -1285,10 +1285,10 @@ void tscKillMetricQuery(SSqlObj *pSql) {
static
void
tscRetrieveDataRes
(
void
*
param
,
TAOS_RES
*
tres
,
int
retCode
);
static
SSqlObj
*
tscCreateSqlObjForSubquery
(
SSqlObj
*
pSql
,
SRetrieveSupport
*
trsupport
,
SSqlObj
*
prevSqlObj
)
{
SSqlObj
*
pNew
=
createSubqueryObj
(
pSql
,
trsupport
->
vnodeIdx
,
0
,
tscRetrieveDataRes
,
trsupport
,
prevSqlObj
);
SSqlObj
*
pNew
=
createSubqueryObj
(
pSql
,
0
,
tscRetrieveDataRes
,
trsupport
,
prevSqlObj
);
if
(
pNew
!=
NULL
)
{
// the sub query of two-stage super table query
pNew
->
cmd
.
type
|=
TSDB_QUERY_TYPE_STABLE_SUBQUERY
;
pSql
->
pSubs
[
trsupport
->
vnodeId
x
]
=
pNew
;
pSql
->
pSubs
[
trsupport
->
subqueryInde
x
]
=
pNew
;
}
return
pNew
;
...
...
@@ -1298,8 +1298,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SRetrieveSupport
*
trsupport
=
(
SRetrieveSupport
*
)
param
;
SSqlObj
*
pSql
=
(
SSqlObj
*
)
tres
;
int32_t
idx
=
pSql
->
cmd
.
vnodeIdx
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
);
int32_t
idx
=
pMeterMetaInfo
->
vnodeIndex
;
SVnodeSidList
*
vnodeInfo
=
NULL
;
SVPeerDesc
*
pSvd
=
NULL
;
...
...
@@ -1317,7 +1317,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
code
=
trsupport
->
pState
->
code
;
}
tscTrace
(
"%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d"
,
trsupport
->
pParentSqlObj
,
pSql
,
trsupport
->
vnodeId
x
,
code
);
trsupport
->
subqueryInde
x
,
code
);
}
/*
...
...
@@ -1337,7 +1337,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SSqlObj
*
pNew
=
tscCreateSqlObjForSubquery
(
trsupport
->
pParentSqlObj
,
trsupport
,
pSql
);
if
(
pNew
==
NULL
)
{
tscError
(
"%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d"
,
trsupport
->
pParentSqlObj
,
pSql
,
pSvd
->
vnode
,
trsupport
->
vnodeId
x
);
trsupport
->
pParentSqlObj
,
pSql
,
pSvd
->
vnode
,
trsupport
->
subqueryInde
x
);
trsupport
->
pState
->
code
=
-
TSDB_CODE_CLI_OUT_OF_MEMORY
;
trsupport
->
numOfRetry
=
MAX_NUM_OF_SUBQUERY_RETRY
;
...
...
@@ -1353,17 +1353,17 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if
(
vnodeInfo
!=
NULL
)
{
tscTrace
(
"%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d"
,
trsupport
->
pParentSqlObj
,
pSql
,
vnodeInfo
->
vpeerDesc
[
vnodeInfo
->
index
].
ip
,
vnodeInfo
->
vpeerDesc
[
vnodeInfo
->
index
].
vnode
,
trsupport
->
vnodeId
x
,
trsupport
->
pState
->
code
);
trsupport
->
subqueryInde
x
,
trsupport
->
pState
->
code
);
}
else
{
tscTrace
(
"%p sub:%p query failed,orderOfSub:%d,global code:%d"
,
trsupport
->
pParentSqlObj
,
pSql
,
trsupport
->
vnodeId
x
,
trsupport
->
pState
->
code
);
trsupport
->
subqueryInde
x
,
trsupport
->
pState
->
code
);
}
tscRetrieveFromVnodeCallBack
(
param
,
tres
,
trsupport
->
pState
->
code
);
}
else
{
// success, proceed to retrieve data from dnode
tscTrace
(
"%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data"
,
trsupport
->
pParentSqlObj
,
pSql
,
vnodeInfo
->
vpeerDesc
[
vnodeInfo
->
index
].
ip
,
vnodeInfo
->
vpeerDesc
[
vnodeInfo
->
index
].
vnode
,
trsupport
->
vnodeId
x
);
trsupport
->
subqueryInde
x
);
taos_fetch_rows_a
(
tres
,
tscRetrieveFromVnodeCallBack
,
param
);
}
...
...
@@ -1438,7 +1438,7 @@ void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
pQueryMsg
->
vnode
=
htons
(
pMeterMeta
->
vpeerDesc
[
pSql
->
index
].
vnode
);
}
else
{
// query on metric
SMetricMeta
*
pMetricMeta
=
pMeterMetaInfo
->
pMetricMeta
;
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
p
Cmd
->
vnodeId
x
);
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
p
MeterMetaInfo
->
vnodeInde
x
);
pQueryMsg
->
vnode
=
htons
(
pVnodeSidList
->
vpeerDesc
[
pSql
->
index
].
vnode
);
}
}
...
...
@@ -1461,7 +1461,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd) {
SMetricMeta
*
pMetricMeta
=
pMeterMetaInfo
->
pMetricMeta
;
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
p
Cmd
->
vnodeId
x
);
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
p
MeterMetaInfo
->
vnodeInde
x
);
int32_t
meterInfoSize
=
(
pMetricMeta
->
tagLen
+
sizeof
(
SMeterSidExtInfo
))
*
pVnodeSidList
->
numOfSids
;
int32_t
outputColumnSize
=
pCmd
->
fieldsInfo
.
numOfOutputCols
*
sizeof
(
SSqlFuncExprMsg
);
...
...
@@ -1506,12 +1506,12 @@ int tscBuildQueryMsg(SSqlObj *pSql) {
pQueryMsg
->
numOfTagsCols
=
0
;
}
else
{
// query on metric
SMetricMeta
*
pMetricMeta
=
pMeterMetaInfo
->
pMetricMeta
;
if
(
p
Cmd
->
vnodeId
x
<
0
)
{
tscError
(
"%p error vnodeIdx:%d"
,
pSql
,
p
Cmd
->
vnodeId
x
);
if
(
p
MeterMetaInfo
->
vnodeInde
x
<
0
)
{
tscError
(
"%p error vnodeIdx:%d"
,
pSql
,
p
MeterMetaInfo
->
vnodeInde
x
);
return
-
1
;
}
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
p
Cmd
->
vnodeId
x
);
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
p
MeterMetaInfo
->
vnodeInde
x
);
uint32_t
vnodeId
=
pVnodeSidList
->
vpeerDesc
[
pVnodeSidList
->
index
].
vnode
;
numOfMeters
=
pVnodeSidList
->
numOfSids
;
...
...
@@ -1693,7 +1693,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) {
pQueryMsg
->
colNameLen
=
htonl
(
len
);
// set sids list
tscTrace
(
"%p vid:%d, query on %d meters"
,
pSql
,
pSql
->
cmd
.
vnodeIdx
,
numOfMeters
);
tscTrace
(
"%p vid:%d, query on %d meters"
,
pSql
,
htons
(
pQueryMsg
->
vnode
)
,
numOfMeters
);
if
(
UTIL_METER_IS_NOMRAL_METER
(
pMeterMetaInfo
))
{
#ifdef _DEBUG_VIEW
...
...
@@ -1703,7 +1703,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) {
pSMeterTagInfo
->
sid
=
htonl
(
pMeterMeta
->
sid
);
pMsg
+=
sizeof
(
SMeterSidExtInfo
);
}
else
{
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
p
Cmd
->
vnodeId
x
);
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
p
MeterMetaInfo
->
vnodeInde
x
);
for
(
int32_t
i
=
0
;
i
<
numOfMeters
;
++
i
)
{
SMeterSidExtInfo
*
pMeterTagInfo
=
(
SMeterSidExtInfo
*
)
pMsg
;
...
...
@@ -1774,7 +1774,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) {
int32_t
numOfBlocks
=
0
;
if
(
pCmd
->
tsBuf
!=
NULL
)
{
STSVnodeBlockInfo
*
pBlockInfo
=
tsBufGetVnodeBlockInfo
(
pCmd
->
tsBuf
,
p
Cmd
->
vnodeId
x
);
STSVnodeBlockInfo
*
pBlockInfo
=
tsBufGetVnodeBlockInfo
(
pCmd
->
tsBuf
,
p
MeterMetaInfo
->
vnodeInde
x
);
assert
(
QUERY_IS_JOIN_QUERY
(
pCmd
->
type
)
&&
pBlockInfo
!=
NULL
);
// this query should not be sent
// todo refactor
...
...
src/client/src/tscSql.c
浏览文件 @
8de14a0c
...
...
@@ -609,7 +609,15 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
assert
((
pRes
->
offset
>=
0
&&
pRes
->
numOfRows
==
0
)
||
(
pRes
->
offset
==
0
&&
pRes
->
numOfRows
>=
0
));
if
((
++
pCmd
->
vnodeIdx
)
<
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
/*
* For project query with super table join, the numOfSub is equalled to the number of all subqueries, so
* we need to reset the value of numOfSubs to be 0.
*
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/
pSql
->
numOfSubs
=
0
;
if
((
++
pMeterMetaInfo
->
vnodeIndex
)
<
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
pCmd
->
command
=
TSDB_SQL_SELECT
;
assert
(
pSql
->
fp
==
NULL
);
tscProcessSql
(
pSql
);
...
...
@@ -617,7 +625,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
}
// check!!!
if
(
rows
!=
NULL
||
p
Cmd
->
vnodeId
x
>=
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
if
(
rows
!=
NULL
||
p
MeterMetaInfo
->
vnodeInde
x
>=
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
break
;
}
}
...
...
@@ -654,7 +662,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
pCmd
->
limit
.
offset
=
pRes
->
offset
;
if
((
++
p
Sql
->
cmd
.
vnodeId
x
)
<
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
if
((
++
p
MeterMetaInfo
->
vnodeInde
x
)
<
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
assert
(
pSql
->
fp
==
NULL
);
tscProcessSql
(
pSql
);
...
...
@@ -662,7 +670,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
}
// check!!!
if
(
*
rows
!=
NULL
||
p
Cmd
->
vnodeId
x
>=
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
if
(
*
rows
!=
NULL
||
p
MeterMetaInfo
->
vnodeInde
x
>=
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
break
;
}
}
...
...
src/client/src/tscUtil.c
浏览文件 @
8de14a0c
...
...
@@ -1474,7 +1474,11 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
* data blocks have been submit to vnode.
*/
SDataBlockList
*
pDataBlocks
=
pCmd
->
pDataBlocks
;
if
(
pDataBlocks
==
NULL
||
pCmd
->
vnodeIdx
>=
pDataBlocks
->
nSize
)
{
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
);
assert
(
pSql
->
cmd
.
numOfTables
==
1
);
if
(
pDataBlocks
==
NULL
||
pMeterMetaInfo
->
vnodeIndex
>=
pDataBlocks
->
nSize
)
{
tscTrace
(
"%p object should be release since all data blocks have been submit"
,
pSql
);
return
true
;
}
else
{
...
...
@@ -1487,10 +1491,11 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
}
SMeterMetaInfo
*
tscGetMeterMetaInfo
(
SSqlCmd
*
pCmd
,
int32_t
index
)
{
if
(
pCmd
==
NULL
||
index
>=
pCmd
->
numOfTables
||
index
<
0
)
{
if
(
pCmd
==
NULL
||
pCmd
->
numOfTables
==
0
)
{
return
NULL
;
}
assert
(
index
>=
0
&&
index
<=
pCmd
->
numOfTables
&&
pCmd
->
pMeterInfo
!=
NULL
);
return
pCmd
->
pMeterInfo
[
index
];
}
...
...
@@ -1587,13 +1592,13 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
pRes
->
numOfRows
=
0
;
}
SSqlObj
*
createSubqueryObj
(
SSqlObj
*
pSql
,
int32_t
vnodeIndex
,
int16_t
tableIndex
,
void
(
*
fp
)(),
void
*
param
,
SSqlObj
*
pPrevSql
)
{
SSqlObj
*
createSubqueryObj
(
SSqlObj
*
pSql
,
int16_t
tableIndex
,
void
(
*
fp
)(),
void
*
param
,
SSqlObj
*
pPrevSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
tableIndex
);
SSqlObj
*
pNew
=
(
SSqlObj
*
)
calloc
(
1
,
sizeof
(
SSqlObj
));
if
(
pNew
==
NULL
)
{
tscError
(
"%p new subquery failed,
vnodeIdx:%d, tableIndex:%d"
,
pSql
,
vnodeIndex
,
tabl
eIndex
);
tscError
(
"%p new subquery failed,
tableIndex:%d, vnodeIndex:%d"
,
pSql
,
tableIndex
,
pMeterMetaInfo
->
vnod
eIndex
);
return
NULL
;
}
...
...
@@ -1602,7 +1607,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex
pNew
->
sqlstr
=
strdup
(
pSql
->
sqlstr
);
if
(
pNew
->
sqlstr
==
NULL
)
{
tscError
(
"%p new subquery failed,
vnodeIdx:%d, tableIndex:%d"
,
pSql
,
vnodeIndex
,
tabl
eIndex
);
tscError
(
"%p new subquery failed,
tableIndex:%d, vnodeIndex:%d"
,
pSql
,
tableIndex
,
pMeterMetaInfo
->
vnod
eIndex
);
free
(
pNew
);
return
NULL
;
...
...
@@ -1627,15 +1632,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex
tscTagCondCopy
(
&
pNew
->
cmd
.
tagCond
,
&
pCmd
->
tagCond
);
if
(
tscAllocPayload
(
&
pNew
->
cmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
)
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"%p new subquery failed,
vnodeIdx:%d, tableIndex:%d"
,
pSql
,
vnodeIndex
,
tabl
eIndex
);
tscError
(
"%p new subquery failed,
tableIndex:%d, vnodeIndex:%d"
,
pSql
,
tableIndex
,
pMeterMetaInfo
->
vnod
eIndex
);
tscFreeSqlObj
(
pNew
);
return
NULL
;
}
tscColumnBaseInfoCopy
(
&
pNew
->
cmd
.
colList
,
&
pCmd
->
colList
,
(
int16_t
)
tableIndex
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
tableIndex
);
// set the correct query type
if
(
pPrevSql
!=
NULL
)
{
pNew
->
cmd
.
type
=
pPrevSql
->
cmd
.
type
;
...
...
@@ -1666,7 +1669,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex
pNew
->
fp
=
fp
;
pNew
->
param
=
param
;
pNew
->
cmd
.
vnodeIdx
=
vnodeIndex
;
SMeterMetaInfo
*
pMetermetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
tableIndex
);
char
key
[
TSDB_MAX_TAGS_LEN
+
1
]
=
{
0
};
...
...
@@ -1695,8 +1697,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex
assert
(
pFinalInfo
->
pMetricMeta
!=
NULL
);
}
tscTrace
(
"%p new subquery %p,
vnodeIdx:%d, tableIndex:%d, type:%d"
,
pSql
,
pNew
,
vnodeIndex
,
tableIndex
,
pNew
->
cmd
.
type
);
tscTrace
(
"%p new subquery %p,
tableIndex:%d, vnodeIdx:%d, type:%d"
,
pSql
,
pNew
,
tableIndex
,
pMeterMetaInfo
->
vnodeIndex
,
pNew
->
cmd
.
type
);
return
pNew
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录