Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
57417c7f
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
57417c7f
编写于
10月 21, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge from develop into feature/os
上级
9101e505
5c1151e3
变更
35
隐藏空白更改
内联
并排
Showing
35 changed file
with
504 addition
and
310 deletion
+504
-310
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+6
-6
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+12
-10
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+4
-4
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+5
-7
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+2
-1
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+41
-39
src/client/src/tscSchemaUtil.c
src/client/src/tscSchemaUtil.c
+14
-3
src/client/src/tscServer.c
src/client/src/tscServer.c
+24
-14
src/client/src/tscSql.c
src/client/src/tscSql.c
+19
-3
src/client/src/tscStream.c
src/client/src/tscStream.c
+3
-3
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+7
-16
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+5
-3
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+114
-93
src/dnode/src/dnodeShell.c
src/dnode/src/dnodeShell.c
+7
-7
src/dnode/src/dnodeVRead.c
src/dnode/src/dnodeVRead.c
+2
-1
src/inc/dnode.h
src/inc/dnode.h
+1
-1
src/inc/taosmsg.h
src/inc/taosmsg.h
+27
-9
src/mnode/inc/mnodeDef.h
src/mnode/inc/mnodeDef.h
+1
-1
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+41
-38
src/mnode/src/mnodeVgroup.c
src/mnode/src/mnodeVgroup.c
+5
-5
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+1
-0
src/query/inc/qSqlparser.h
src/query/inc/qSqlparser.h
+0
-2
src/query/src/qAst.c
src/query/src/qAst.c
+1
-2
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+21
-4
src/query/src/qParserImpl.c
src/query/src/qParserImpl.c
+5
-22
src/query/src/qPercentile.c
src/query/src/qPercentile.c
+1
-1
src/query/src/qSyntaxtreefunction.c
src/query/src/qSyntaxtreefunction.c
+3
-0
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+1
-1
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+1
-1
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+34
-10
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+1
-1
tests/script/general/parser/col_arithmetic_operation.sim
tests/script/general/parser/col_arithmetic_operation.sim
+56
-0
tests/script/general/parser/constCol.sim
tests/script/general/parser/constCol.sim
+32
-0
tests/script/general/parser/tags_dynamically_specifiy.sim
tests/script/general/parser/tags_dynamically_specifiy.sim
+5
-0
tests/script/general/parser/union.sim
tests/script/general/parser/union.sim
+2
-2
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
57417c7f
...
...
@@ -149,14 +149,13 @@ int tscAllocPayload(SSqlCmd* pCmd, int size);
TAOS_FIELD
tscCreateField
(
int8_t
type
,
const
char
*
name
,
int16_t
bytes
);
S
FieldSupInfo
*
tscFieldInfoAppend
(
SFieldInfo
*
pFieldInfo
,
TAOS_FIELD
*
pField
);
S
FieldSupInfo
*
tscFieldInfoInsert
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
TAOS_FIELD
*
field
);
S
InternalField
*
tscFieldInfoAppend
(
SFieldInfo
*
pFieldInfo
,
TAOS_FIELD
*
pField
);
S
InternalField
*
tscFieldInfoInsert
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
TAOS_FIELD
*
field
);
S
FieldSupInfo
*
tscFieldInfoGetSupp
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
);
S
InternalField
*
tscFieldInfoGetInternalField
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
);
TAOS_FIELD
*
tscFieldInfoGetField
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
);
void
tscFieldInfoUpdateOffset
(
SQueryInfo
*
pQueryInfo
);
void
tscFieldInfoCopy
(
SFieldInfo
*
dst
,
const
SFieldInfo
*
src
);
void
tscFieldInfoUpdateOffsetForInterResult
(
SQueryInfo
*
pQueryInfo
);
int16_t
tscFieldInfoGetOffset
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
);
...
...
@@ -231,10 +230,11 @@ int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int
tscGetMeterMetaEx
(
SSqlObj
*
pSql
,
STableMetaInfo
*
pTableMetaInfo
,
bool
createIfNotExists
);
void
tscResetForNextRetrieve
(
SSqlRes
*
pRes
);
void
tscAddTimestampColumn
(
SQueryInfo
*
pQueryInfo
,
int16_t
functionId
,
int16_t
tableIndex
);
void
tscDoQuery
(
SSqlObj
*
pSql
);
SVgroupsInfo
*
tscVgroupInfoClone
(
SVgroupsInfo
*
pInfo
);
void
*
tscVgroupInfoClear
(
SVgroupsInfo
*
pInfo
);
void
tscSCMVgroupInfoCopy
(
SCMVgroupInfo
*
dst
,
const
SCMVgroupInfo
*
src
);
/**
* The create object function must be successful expect for the out of memory issue.
*
...
...
src/client/inc/tsclient.h
浏览文件 @
57417c7f
...
...
@@ -90,10 +90,10 @@ typedef struct STableComInfo {
}
STableComInfo
;
typedef
struct
SCMCorVgroupInfo
{
int32_t
version
;
int8_t
inUse
;
int8_t
numOfEps
;
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
int32_t
version
;
int8_t
inUse
;
int8_t
numOfEps
;
SEpAddr
1
epAddr
[
TSDB_MAX_REPLICA
];
}
SCMCorVgroupInfo
;
typedef
struct
STableMeta
{
...
...
@@ -142,16 +142,17 @@ typedef struct SColumnIndex {
int16_t
columnIndex
;
}
SColumnIndex
;
typedef
struct
SFieldSupInfo
{
typedef
struct
SInternalField
{
TAOS_FIELD
field
;
bool
visible
;
SExprInfo
*
pArithExprInfo
;
SSqlExpr
*
pSqlExpr
;
}
S
FieldSupInfo
;
}
S
InternalField
;
typedef
struct
SFieldInfo
{
int16_t
numOfOutput
;
// number of column in result
SArray
*
pFields
;
// SArray<TAOS_FIELD>
SArray
*
pSupportInfo
;
// SArray<SFieldSupInfo
>
int16_t
numOfOutput
;
// number of column in result
TAOS_FIELD
*
final
;
SArray
*
internalField
;
// SArray<SInternalField
>
}
SFieldInfo
;
typedef
struct
SColumn
{
...
...
@@ -443,6 +444,7 @@ void tscPartiallyFreeSqlObj(SSqlObj *pSql);
*/
void
tscFreeSqlObj
(
SSqlObj
*
pSql
);
void
tscFreeRegisteredSqlObj
(
void
*
pSql
);
void
tscFreeTableMetaHelper
(
void
*
pTableMeta
);
void
tscCloseTscObj
(
STscObj
*
pObj
);
...
...
@@ -467,7 +469,7 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s
int32_t
tscToSQLCmd
(
SSqlObj
*
pSql
,
struct
SSqlInfo
*
pInfo
);
static
FORCE_INLINE
void
tscGetResultColumnChr
(
SSqlRes
*
pRes
,
SFieldInfo
*
pFieldInfo
,
int32_t
columnIndex
)
{
S
FieldSupInfo
*
pInfo
=
(
SFieldSupInfo
*
)
TARRAY_GET_ELEM
(
pFieldInfo
->
pSupportInfo
,
columnIndex
);
S
InternalField
*
pInfo
=
(
SInternalField
*
)
TARRAY_GET_ELEM
(
pFieldInfo
->
internalField
,
columnIndex
);
assert
(
pInfo
->
pSqlExpr
!=
NULL
);
int32_t
type
=
pInfo
->
pSqlExpr
->
resType
;
...
...
src/client/src/tscAsync.c
浏览文件 @
57417c7f
...
...
@@ -327,7 +327,7 @@ void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
}
for
(
int
i
=
0
;
i
<
pCmd
->
numOfCols
;
++
i
){
S
FieldSupInfo
*
pSup
=
taosArrayGet
(
pQueryInfo
->
fieldsInfo
.
pSupportInfo
,
i
);
S
InternalField
*
pSup
=
taosArrayGet
(
pQueryInfo
->
fieldsInfo
.
internalField
,
i
);
if
(
pSup
->
pSqlExpr
!=
NULL
)
{
// pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row;
}
else
{
...
...
@@ -348,7 +348,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
for
(
int
i
=
0
;
i
<
pCmd
->
numOfCols
;
++
i
)
{
S
FieldSupInfo
*
pSup
=
taosArrayGet
(
pQueryInfo
->
fieldsInfo
.
pSupportInfo
,
i
);
S
InternalField
*
pSup
=
taosArrayGet
(
pQueryInfo
->
fieldsInfo
.
internalField
,
i
);
if
(
pSup
->
pSqlExpr
!=
NULL
)
{
tscGetResultColumnChr
(
pRes
,
&
pQueryInfo
->
fieldsInfo
,
i
);
...
...
@@ -405,11 +405,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
pRes
->
code
=
code
;
const
char
*
msg
=
(
pCmd
->
command
==
TSDB_SQL_STABLEVGROUP
)
?
"vgroup-list"
:
"table-meta"
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"%p get
tableMeta failed, code:%s"
,
pSql
,
tstrerror
(
code
));
tscError
(
"%p get
%s failed, code:%s"
,
pSql
,
msg
,
tstrerror
(
code
));
goto
_error
;
}
else
{
const
char
*
msg
=
(
pCmd
->
command
==
TSDB_SQL_STABLEVGROUP
)
?
"vgroup-list"
:
"table-meta"
;
tscDebug
(
"%p get %s successfully"
,
pSql
,
msg
);
}
...
...
src/client/src/tscLocal.c
浏览文件 @
57417c7f
...
...
@@ -239,7 +239,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
TAOS_FIELD
f
=
{.
type
=
TSDB_DATA_TYPE_BINARY
,
.
bytes
=
(
TSDB_COL_NAME_LEN
-
1
)
+
VARSTR_HEADER_SIZE
};
tstrncpy
(
f
.
name
,
"Field"
,
sizeof
(
f
.
name
));
S
FieldSupInfo
*
pInfo
=
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
S
InternalField
*
pInfo
=
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
pInfo
->
pSqlExpr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
(
TSDB_COL_NAME_LEN
-
1
)
+
VARSTR_HEADER_SIZE
,
(
TSDB_COL_NAME_LEN
-
1
),
false
);
...
...
@@ -485,7 +485,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
tstrncpy
(
f
.
name
,
"Database"
,
sizeof
(
f
.
name
));
}
S
FieldSupInfo
*
pInfo
=
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
S
InternalField
*
pInfo
=
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
pInfo
->
pSqlExpr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
f
.
bytes
,
f
.
bytes
-
VARSTR_HEADER_SIZE
,
false
);
...
...
@@ -922,19 +922,17 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
pQueryInfo
->
order
.
order
=
TSDB_ORDER_ASC
;
tscFieldInfoClear
(
&
pQueryInfo
->
fieldsInfo
);
pQueryInfo
->
fieldsInfo
.
pFields
=
taosArrayInit
(
1
,
sizeof
(
TAOS_FIELD
));
pQueryInfo
->
fieldsInfo
.
pSupportInfo
=
taosArrayInit
(
1
,
sizeof
(
SFieldSupInfo
));
pQueryInfo
->
fieldsInfo
.
internalField
=
taosArrayInit
(
1
,
sizeof
(
SInternalField
));
TAOS_FIELD
f
=
tscCreateField
((
int8_t
)
type
,
columnName
,
(
int16_t
)
valueLength
);
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
tscInitResObjForLocalQuery
(
pSql
,
1
,
(
int32_t
)
valueLength
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fieldsInfo
,
0
);
SFieldSupInfo
*
pInfo
=
tscFieldInfoGetSupp
(
&
pQueryInfo
->
fieldsInfo
,
0
);
SInternalField
*
pInfo
=
tscFieldInfoGetInternalField
(
&
pQueryInfo
->
fieldsInfo
,
0
);
pInfo
->
pSqlExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
memcpy
(
pRes
->
data
,
val
,
p
Field
->
bytes
);
memcpy
(
pRes
->
data
,
val
,
p
Info
->
field
.
bytes
);
}
int
tscProcessLocalCmd
(
SSqlObj
*
pSql
)
{
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
57417c7f
...
...
@@ -510,7 +510,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
taosTFree
(
pLocalReducer
->
pResultBuf
);
if
(
pLocalReducer
->
pResInfo
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
size_t
num
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
taosTFree
(
pLocalReducer
->
pResInfo
[
i
].
interResultBuf
);
}
...
...
src/client/src/tscSQLParser.c
浏览文件 @
57417c7f
...
...
@@ -122,7 +122,7 @@ static int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSql
static
int32_t
doCheckForCreateFromStable
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
);
static
int32_t
doCheckForStream
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
);
static
int32_t
doCheckForQuery
(
SSqlObj
*
pSql
,
SQuerySQL
*
pQuerySql
,
int32_t
index
);
static
int32_t
exprTreeFromSqlExpr
(
SSqlCmd
*
pCmd
,
tExprNode
**
pExpr
,
const
tSQLExpr
*
pSqlExpr
,
S
Array
*
pExprInfo
,
SQueryInfo
*
pQueryInfo
,
SArray
*
pCols
);
static
int32_t
exprTreeFromSqlExpr
(
SSqlCmd
*
pCmd
,
tExprNode
**
pExpr
,
const
tSQLExpr
*
pSqlExpr
,
S
QueryInfo
*
pQueryInfo
,
SArray
*
pCols
,
int64_t
*
uid
);
/*
* Used during parsing query sql. Since the query sql usually small in length, error position
...
...
@@ -190,7 +190,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
!
pInfo
->
valid
)
{
if
(
!
pInfo
->
valid
||
terrno
==
TSDB_CODE_TSC_SQL_SYNTAX_ERROR
)
{
terrno
=
TSDB_CODE_SUCCESS
;
// clear the error number
return
tscSQLSyntaxErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
NULL
,
pInfo
->
pzErrMsg
);
}
...
...
@@ -1249,7 +1250,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
tExprNode
*
pNode
=
NULL
;
SArray
*
colList
=
taosArrayInit
(
10
,
sizeof
(
SColIndex
));
int32_t
ret
=
exprTreeFromSqlExpr
(
pCmd
,
&
pNode
,
pItem
->
pNode
,
pQueryInfo
->
exprList
,
pQueryInfo
,
colList
);
int32_t
ret
=
exprTreeFromSqlExpr
(
pCmd
,
&
pNode
,
pItem
->
pNode
,
pQueryInfo
,
colList
,
NULL
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
taosArrayDestroy
(
colList
);
tExprTreeDestroy
(
&
pNode
,
NULL
);
...
...
@@ -1305,17 +1306,17 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
insertResultField
(
pQueryInfo
,
exprIndex
,
&
columnList
,
sizeof
(
double
),
TSDB_DATA_TYPE_DOUBLE
,
aliasName
,
NULL
);
int32_t
slot
=
tscNumOfFields
(
pQueryInfo
)
-
1
;
S
FieldSupInfo
*
pInfo
=
tscFieldInfoGetSupp
(
&
pQueryInfo
->
fieldsInfo
,
slot
);
S
InternalField
*
pInfo
=
tscFieldInfoGetInternalField
(
&
pQueryInfo
->
fieldsInfo
,
slot
);
if
(
pInfo
->
pSqlExpr
==
NULL
)
{
SExprInfo
*
pArithExprInfo
=
calloc
(
1
,
sizeof
(
SExprInfo
));
// arithmetic expression always return result in the format of double float
pArithExprInfo
->
bytes
=
sizeof
(
double
);
pArithExprInfo
->
bytes
=
sizeof
(
double
);
pArithExprInfo
->
interBytes
=
sizeof
(
double
);
pArithExprInfo
->
type
=
TSDB_DATA_TYPE_DOUBLE
;
pArithExprInfo
->
type
=
TSDB_DATA_TYPE_DOUBLE
;
int32_t
ret
=
exprTreeFromSqlExpr
(
pCmd
,
&
pArithExprInfo
->
pExpr
,
pItem
->
pNode
,
pQueryInfo
->
exprList
,
pQueryInfo
,
NULL
);
int32_t
ret
=
exprTreeFromSqlExpr
(
pCmd
,
&
pArithExprInfo
->
pExpr
,
pItem
->
pNode
,
pQueryInfo
,
NULL
,
&
pArithExprInfo
->
uid
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
tExprTreeDestroy
(
&
pArithExprInfo
->
pExpr
,
NULL
);
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
"invalid expression in select clause"
);
...
...
@@ -1372,7 +1373,7 @@ static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) {
int32_t
numOfCols
=
(
int32_t
)
tscSqlExprNumOfExprs
(
pQueryInfo
);
tscAddSpecialColumnForSelect
(
pQueryInfo
,
numOfCols
,
TSDB_FUNC_PRJ
,
&
index
,
pSchema
,
TSDB_COL_NORMAL
);
S
FieldSupInfo
*
pSupInfo
=
tscFieldInfoGetSupp
(
&
pQueryInfo
->
fieldsInfo
,
numOfCols
);
S
InternalField
*
pSupInfo
=
tscFieldInfoGetInternalField
(
&
pQueryInfo
->
fieldsInfo
,
numOfCols
);
pSupInfo
->
visible
=
false
;
pQueryInfo
->
type
|=
TSDB_QUERY_TYPE_PROJECTION_QUERY
;
...
...
@@ -1469,7 +1470,7 @@ int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnLi
}
TAOS_FIELD
f
=
tscCreateField
(
type
,
fieldName
,
bytes
);
S
FieldSupInfo
*
pInfo
=
tscFieldInfoInsert
(
&
pQueryInfo
->
fieldsInfo
,
outputIndex
,
&
f
);
S
InternalField
*
pInfo
=
tscFieldInfoInsert
(
&
pQueryInfo
->
fieldsInfo
,
outputIndex
,
&
f
);
pInfo
->
pSqlExpr
=
pSqlExpr
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -3384,10 +3385,26 @@ static int32_t validateSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryInfo* pQuer
tSQLExprItem
item
=
{.
pNode
=
pExpr
,
.
aliasName
=
NULL
};
// sql function in selection clause, append sql function info in pSqlCmd structure sequentially
// sql function list in selection clause.
// Append the sqlExpr into exprList of pQueryInfo structure sequentially
if
(
addExprAndResultField
(
pCmd
,
pQueryInfo
,
outputIndex
,
&
item
,
false
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
// It is invalid in case of more than one sqlExpr, such as first(ts, k) - last(ts, k)
int32_t
inc
=
(
int32_t
)
tscSqlExprNumOfExprs
(
pQueryInfo
)
-
outputIndex
;
if
(
inc
>
1
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
// Not supported data type in arithmetic expression
for
(
int32_t
i
=
0
;
i
<
inc
;
++
i
)
{
SSqlExpr
*
p1
=
tscSqlExprGet
(
pQueryInfo
,
i
+
outputIndex
);
int16_t
t
=
p1
->
resType
;
if
(
t
==
TSDB_DATA_TYPE_BINARY
||
t
==
TSDB_DATA_TYPE_NCHAR
||
t
==
TSDB_DATA_TYPE_BOOL
||
t
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
}
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -4062,7 +4079,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE
tExprNode
*
p
=
NULL
;
SArray
*
colList
=
taosArrayInit
(
10
,
sizeof
(
SColIndex
));
ret
=
exprTreeFromSqlExpr
(
pCmd
,
&
p
,
p1
,
NULL
,
pQueryInfo
,
colList
);
ret
=
exprTreeFromSqlExpr
(
pCmd
,
&
p
,
p1
,
pQueryInfo
,
colList
,
NULL
);
SBufferWriter
bw
=
tbufInitWriter
(
NULL
,
false
);
TRY
(
0
)
{
...
...
@@ -4733,7 +4750,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// validate the length of binary
if
((
pTagsSchema
->
type
==
TSDB_DATA_TYPE_BINARY
||
pTagsSchema
->
type
==
TSDB_DATA_TYPE_NCHAR
)
&&
(
pVarList
->
a
[
1
].
pVar
.
nLen
+
VARSTR_HEADER_SIZE
)
>
pTagsSchema
->
bytes
)
{
varDataTLen
(
pAlterSQL
->
tagData
.
data
)
>
pTagsSchema
->
bytes
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg14
);
}
...
...
@@ -5259,26 +5276,6 @@ int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) {
return
TSDB_CODE_SUCCESS
;
}
//void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t tableIndex) {
// // the first column not timestamp column, add it
// SSqlExpr* pExpr = NULL;
// if (tscSqlExprNumOfExprs(pQueryInfo) > 0) {
// pExpr = tscSqlExprGet(pQueryInfo, 0);
// }
//
// if (pExpr == NULL || pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX || pExpr->functionId != functionId) {
// SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
//
// pExpr = tscSqlExprInsert(pQueryInfo, 0, functionId, &index, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE, false);
// pExpr->colInfo.flag = TSDB_COL_NORMAL;
//
// // NOTE: tag column does not add to source column list
// SColumnList ids = getColumnList(1, tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX);
//
// insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, "ts", pExpr);
// }
//}
void
addGroupInfoForSubquery
(
SSqlObj
*
pParentObj
,
SSqlObj
*
pSql
,
int32_t
subClauseIndex
,
int32_t
tableIndex
)
{
SQueryInfo
*
pParentQueryInfo
=
tscGetQueryInfoDetail
(
&
pParentObj
->
cmd
,
subClauseIndex
);
...
...
@@ -5335,7 +5332,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) {
tscAddSpecialColumnForSelect
(
pQueryInfo
,
(
int32_t
)
size
,
TSDB_FUNC_PRJ
,
&
colIndex
,
pSchema
,
TSDB_COL_NORMAL
);
S
FieldSupInfo
*
pInfo
=
tscFieldInfoGetSupp
(
&
pQueryInfo
->
fieldsInfo
,
(
int32_t
)
size
);
S
InternalField
*
pInfo
=
tscFieldInfoGetInternalField
(
&
pQueryInfo
->
fieldsInfo
,
(
int32_t
)
size
);
doLimitOutputNormalColOfGroupby
(
pInfo
->
pSqlExpr
);
pInfo
->
visible
=
false
;
}
...
...
@@ -6380,19 +6377,19 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return
TSDB_CODE_SUCCESS
;
// Does not build query message here
}
int32_t
exprTreeFromSqlExpr
(
SSqlCmd
*
pCmd
,
tExprNode
**
pExpr
,
const
tSQLExpr
*
pSqlExpr
,
S
Array
*
pExprInfo
,
SQueryInfo
*
pQueryInfo
,
SArray
*
pCols
)
{
int32_t
exprTreeFromSqlExpr
(
SSqlCmd
*
pCmd
,
tExprNode
**
pExpr
,
const
tSQLExpr
*
pSqlExpr
,
S
QueryInfo
*
pQueryInfo
,
SArray
*
pCols
,
int64_t
*
uid
)
{
tExprNode
*
pLeft
=
NULL
;
tExprNode
*
pRight
=
NULL
;
if
(
pSqlExpr
->
pLeft
!=
NULL
)
{
int32_t
ret
=
exprTreeFromSqlExpr
(
pCmd
,
&
pLeft
,
pSqlExpr
->
pLeft
,
p
ExprInfo
,
pQueryInfo
,
pCols
);
int32_t
ret
=
exprTreeFromSqlExpr
(
pCmd
,
&
pLeft
,
pSqlExpr
->
pLeft
,
p
QueryInfo
,
pCols
,
uid
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
}
if
(
pSqlExpr
->
pRight
!=
NULL
)
{
int32_t
ret
=
exprTreeFromSqlExpr
(
pCmd
,
&
pRight
,
pSqlExpr
->
pRight
,
p
ExprInfo
,
pQueryInfo
,
pCols
);
int32_t
ret
=
exprTreeFromSqlExpr
(
pCmd
,
&
pRight
,
pSqlExpr
->
pRight
,
p
QueryInfo
,
pCols
,
uid
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
...
...
@@ -6419,14 +6416,19 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS
strncpy
((
*
pExpr
)
->
pSchema
->
name
,
pSqlExpr
->
operand
.
z
,
pSqlExpr
->
operand
.
n
);
// set the input column data byte and type.
size_t
size
=
taosArrayGetSize
(
p
ExprInfo
);
size_t
size
=
taosArrayGetSize
(
p
QueryInfo
->
exprList
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SSqlExpr
*
p1
=
taosArrayGetP
(
p
ExprInfo
,
i
);
SSqlExpr
*
p1
=
taosArrayGetP
(
p
QueryInfo
->
exprList
,
i
);
if
(
strcmp
((
*
pExpr
)
->
pSchema
->
name
,
p1
->
aliasName
)
==
0
)
{
(
*
pExpr
)
->
pSchema
->
type
=
(
uint8_t
)
p1
->
resType
;
(
*
pExpr
)
->
pSchema
->
type
=
(
uint8_t
)
p1
->
resType
;
(
*
pExpr
)
->
pSchema
->
bytes
=
p1
->
resBytes
;
if
(
uid
!=
NULL
)
{
*
uid
=
p1
->
uid
;
}
break
;
}
}
...
...
src/client/src/tscSchemaUtil.c
浏览文件 @
57417c7f
...
...
@@ -145,10 +145,11 @@ static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo
corVgroupInfo
->
inUse
=
0
;
corVgroupInfo
->
numOfEps
=
vgroupInfo
->
numOfEps
;
for
(
int32_t
i
=
0
;
i
<
corVgroupInfo
->
numOfEps
;
i
++
)
{
strncpy
(
corVgroupInfo
->
epAddr
[
i
].
fqdn
,
vgroupInfo
->
epAddr
[
i
].
fqdn
,
TSDB_FQDN_LEN
);
corVgroupInfo
->
epAddr
[
i
].
fqdn
=
strdup
(
vgroupInfo
->
epAddr
[
i
].
fqdn
);
corVgroupInfo
->
epAddr
[
i
].
port
=
vgroupInfo
->
epAddr
[
i
].
port
;
}
}
STableMeta
*
tscCreateTableMetaFromMsg
(
STableMetaMsg
*
pTableMetaMsg
,
size_t
*
size
)
{
assert
(
pTableMetaMsg
!=
NULL
);
...
...
@@ -162,9 +163,19 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
.
numOfColumns
=
pTableMetaMsg
->
numOfColumns
,
};
pTableMeta
->
id
.
tid
=
pTableMetaMsg
->
s
id
;
pTableMeta
->
id
.
tid
=
pTableMetaMsg
->
t
id
;
pTableMeta
->
id
.
uid
=
pTableMetaMsg
->
uid
;
pTableMeta
->
vgroupInfo
=
pTableMetaMsg
->
vgroup
;
SCMVgroupInfo
*
pVgroupInfo
=
&
pTableMeta
->
vgroupInfo
;
pVgroupInfo
->
numOfEps
=
pTableMetaMsg
->
vgroup
.
numOfEps
;
pVgroupInfo
->
vgId
=
pTableMetaMsg
->
vgroup
.
vgId
;
for
(
int32_t
i
=
0
;
i
<
pVgroupInfo
->
numOfEps
;
++
i
)
{
SEpAddrMsg
*
pEpMsg
=
&
pTableMetaMsg
->
vgroup
.
epAddr
[
i
];
pVgroupInfo
->
epAddr
[
i
].
fqdn
=
strndup
(
pEpMsg
->
fqdn
,
tListLen
(
pEpMsg
->
fqdn
));
pVgroupInfo
->
epAddr
[
i
].
port
=
pEpMsg
->
port
;
}
tscInitCorVgroupInfo
(
&
pTableMeta
->
corVgroupInfo
,
&
pTableMeta
->
vgroupInfo
);
...
...
src/client/src/tscServer.c
浏览文件 @
57417c7f
...
...
@@ -124,9 +124,11 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
pVgroupInfo
->
inUse
=
pEpSet
->
inUse
;
pVgroupInfo
->
numOfEps
=
pEpSet
->
numOfEps
;
for
(
int32_t
i
=
0
;
i
<
pVgroupInfo
->
numOfEps
;
i
++
)
{
tstrncpy
(
pVgroupInfo
->
epAddr
[
i
].
fqdn
,
pEpSet
->
fqdn
[
i
],
TSDB_FQDN_LEN
);
taosTFree
(
pVgroupInfo
->
epAddr
[
i
].
fqdn
);
pVgroupInfo
->
epAddr
[
i
].
fqdn
=
strndup
(
pEpSet
->
fqdn
[
i
],
tListLen
(
pEpSet
->
fqdn
[
i
]));
pVgroupInfo
->
epAddr
[
i
].
port
=
pEpSet
->
port
[
i
];
}
tscDebug
(
"after: EndPoint in use: %d"
,
pVgroupInfo
->
inUse
);
taosCorEndWrite
(
&
pVgroupInfo
->
version
);
}
...
...
@@ -1648,7 +1650,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int
tscProcessTableMetaRsp
(
SSqlObj
*
pSql
)
{
STableMetaMsg
*
pMetaMsg
=
(
STableMetaMsg
*
)
pSql
->
res
.
pRsp
;
pMetaMsg
->
sid
=
htonl
(
pMetaMsg
->
s
id
);
pMetaMsg
->
tid
=
htonl
(
pMetaMsg
->
t
id
);
pMetaMsg
->
sversion
=
htons
(
pMetaMsg
->
sversion
);
pMetaMsg
->
tversion
=
htons
(
pMetaMsg
->
tversion
);
pMetaMsg
->
vgroup
.
vgId
=
htonl
(
pMetaMsg
->
vgroup
.
vgId
);
...
...
@@ -1658,9 +1660,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
pMetaMsg
->
numOfColumns
=
htons
(
pMetaMsg
->
numOfColumns
);
if
((
pMetaMsg
->
tableType
!=
TSDB_SUPER_TABLE
)
&&
(
pMetaMsg
->
s
id
<=
0
||
pMetaMsg
->
vgroup
.
vgId
<
2
||
pMetaMsg
->
vgroup
.
numOfEps
<=
0
))
{
(
pMetaMsg
->
t
id
<=
0
||
pMetaMsg
->
vgroup
.
vgId
<
2
||
pMetaMsg
->
vgroup
.
numOfEps
<=
0
))
{
tscError
(
"invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s"
,
pMetaMsg
->
vgroup
.
numOfEps
,
pMetaMsg
->
vgroup
.
vgId
,
pMetaMsg
->
s
id
,
pMetaMsg
->
tableId
);
pMetaMsg
->
t
id
,
pMetaMsg
->
tableId
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
...
...
@@ -1839,22 +1841,30 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
SSqlCmd
*
pCmd
=
&
parent
->
cmd
;
for
(
int32_t
i
=
0
;
i
<
pStableVgroup
->
numOfTables
;
++
i
)
{
STableMetaInfo
*
pInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
pCmd
->
clauseIndex
,
i
);
SVgroupsInfo
*
pVgroupInfo
=
(
SVgroupsInfo
*
)
pMsg
;
pVgroupInfo
->
numOfVgroups
=
htonl
(
pVgroupInfo
->
numOfVgroups
);
size_t
size
=
sizeof
(
SCMVgroupInfo
)
*
pVgroupInfo
->
numOfVgroups
+
sizeof
(
SVgroupsInfo
);
pInfo
->
vgroupList
=
calloc
(
1
,
size
);
SVgroupsMsg
*
pVgroupMsg
=
(
SVgroupsMsg
*
)
pMsg
;
pVgroupMsg
->
numOfVgroups
=
htonl
(
pVgroupMsg
->
numOfVgroups
);
size_t
size
=
sizeof
(
SCMVgroupMsg
)
*
pVgroupMsg
->
numOfVgroups
+
sizeof
(
SVgroupsMsg
);
size_t
vgroupsz
=
sizeof
(
SCMVgroupInfo
)
*
pVgroupMsg
->
numOfVgroups
+
sizeof
(
SVgroupsInfo
);
pInfo
->
vgroupList
=
calloc
(
1
,
vgroupsz
);
assert
(
pInfo
->
vgroupList
!=
NULL
);
memcpy
(
pInfo
->
vgroupList
,
pVgroupInfo
,
size
)
;
pInfo
->
vgroupList
->
numOfVgroups
=
pVgroupMsg
->
numOfVgroups
;
for
(
int32_t
j
=
0
;
j
<
pInfo
->
vgroupList
->
numOfVgroups
;
++
j
)
{
//just init, no need to lock
SCMVgroupInfo
*
pVgroups
=
&
pInfo
->
vgroupList
->
vgroups
[
j
];
pVgroups
->
vgId
=
htonl
(
pVgroups
->
vgId
);
assert
(
pVgroups
->
numOfEps
>=
1
);
SCMVgroupMsg
*
vmsg
=
&
pVgroupMsg
->
vgroups
[
j
];
pVgroups
->
vgId
=
htonl
(
vmsg
->
vgId
);
pVgroups
->
numOfEps
=
vmsg
->
numOfEps
;
assert
(
pVgroups
->
numOfEps
>=
1
&&
pVgroups
->
vgId
>=
1
);
for
(
int32_t
k
=
0
;
k
<
pVgroups
->
numOfEps
;
++
k
)
{
pVgroups
->
epAddr
[
k
].
port
=
htons
(
pVgroups
->
epAddr
[
k
].
port
);
pVgroups
->
epAddr
[
k
].
port
=
htons
(
vmsg
->
epAddr
[
k
].
port
);
pVgroups
->
epAddr
[
k
].
fqdn
=
strndup
(
vmsg
->
epAddr
[
k
].
fqdn
,
tListLen
(
vmsg
->
epAddr
[
k
].
fqdn
));
}
}
...
...
@@ -1890,7 +1900,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
pMetaMsg
->
numOfColumns
=
ntohs
(
pMetaMsg
->
numOfColumns
);
pSchema
=
pMetaMsg
->
schema
;
pMetaMsg
->
sid
=
ntohs
(
pMetaMsg
->
s
id
);
pMetaMsg
->
tid
=
ntohs
(
pMetaMsg
->
t
id
);
for
(
int
i
=
0
;
i
<
pMetaMsg
->
numOfColumns
;
++
i
)
{
pSchema
->
bytes
=
htons
(
pSchema
->
bytes
);
pSchema
++
;
...
...
@@ -1924,7 +1934,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
tscColumnListInsert
(
pQueryInfo
->
colList
,
&
index
);
TAOS_FIELD
f
=
tscCreateField
(
pSchema
->
type
,
pSchema
->
name
,
pSchema
->
bytes
);
S
FieldSupInfo
*
pInfo
=
tscFieldInfoAppend
(
pFieldInfo
,
&
f
);
S
InternalField
*
pInfo
=
tscFieldInfoAppend
(
pFieldInfo
,
&
f
);
pInfo
->
pSqlExpr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
pTableSchema
[
i
].
type
,
pTableSchema
[
i
].
bytes
,
pTableSchema
[
i
].
bytes
,
false
);
...
...
src/client/src/tscSql.c
浏览文件 @
57417c7f
...
...
@@ -373,7 +373,7 @@ int taos_num_fields(TAOS_RES *res) {
size_t
numOfCols
=
tscNumOfFields
(
pQueryInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
S
FieldSupInfo
*
pInfo
=
taosArrayGet
(
pQueryInfo
->
fieldsInfo
.
pSupportInfo
,
i
);
S
InternalField
*
pInfo
=
taosArrayGet
(
pQueryInfo
->
fieldsInfo
.
internalField
,
i
);
if
(
pInfo
->
visible
)
{
num
++
;
}
...
...
@@ -409,8 +409,24 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
if
(
numOfCols
==
0
)
{
return
NULL
;
}
return
pQueryInfo
->
fieldsInfo
.
pFields
->
pData
;
SFieldInfo
*
pFieldInfo
=
&
pQueryInfo
->
fieldsInfo
;
if
(
pFieldInfo
->
final
==
NULL
)
{
TAOS_FIELD
*
f
=
calloc
(
pFieldInfo
->
numOfOutput
,
sizeof
(
TAOS_FIELD
));
int32_t
j
=
0
;
for
(
int32_t
i
=
0
;
i
<
pFieldInfo
->
numOfOutput
;
++
i
)
{
SInternalField
*
pField
=
tscFieldInfoGetInternalField
(
pFieldInfo
,
i
);
if
(
pField
->
visible
)
{
f
[
j
++
]
=
pField
->
field
;
}
}
pFieldInfo
->
final
=
f
;
}
return
pFieldInfo
->
final
;
}
int
taos_retrieve
(
TAOS_RES
*
res
)
{
...
...
src/client/src/tscStream.c
浏览文件 @
57417c7f
...
...
@@ -168,8 +168,8 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pStream
->
pSql
->
cmd
,
0
,
0
);
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
(
pTableMetaInfo
->
pTableMeta
),
true
);
taosTFree
(
pTableMetaInfo
->
vgroupList
);
pTableMetaInfo
->
vgroupList
=
tscVgroupInfoClear
(
pTableMetaInfo
->
vgroupList
);
tscSetRetryTimer
(
pStream
,
pStream
->
pSql
,
retryDelay
);
return
;
}
...
...
@@ -275,7 +275,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
tscFreeSqlResult
(
pSql
);
taosTFree
(
pSql
->
pSubs
);
pSql
->
subState
.
numOfSub
=
0
;
taosTFree
(
pTableMetaInfo
->
vgroupList
);
pTableMetaInfo
->
vgroupList
=
tscVgroupInfoClear
(
pTableMetaInfo
->
vgroupList
);
tscSetNextLaunchTimer
(
pStream
,
pSql
);
}
}
...
...
src/client/src/tscSubquery.c
浏览文件 @
57417c7f
...
...
@@ -449,7 +449,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
SVgroupTableInfo
info
=
{{
0
}};
for
(
int32_t
m
=
0
;
m
<
pvg
->
numOfVgroups
;
++
m
)
{
if
(
tt
->
vgId
==
pvg
->
vgroups
[
m
].
vgId
)
{
info
.
vgInfo
=
pvg
->
vgroups
[
m
]
;
tscSCMVgroupInfoCopy
(
&
info
.
vgInfo
,
&
pvg
->
vgroups
[
m
])
;
break
;
}
}
...
...
@@ -1142,7 +1142,6 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
static
SSqlObj
*
tscCreateSTableSubquery
(
SSqlObj
*
pSql
,
SRetrieveSupport
*
trsupport
,
SSqlObj
*
prevSqlObj
);
// TODO
int32_t
tscCreateJoinSubquery
(
SSqlObj
*
pSql
,
int16_t
tableIndex
,
SJoinSupporter
*
pSupporter
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
...
...
@@ -1298,14 +1297,6 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
assert
((
pQueryInfo
->
type
&
TSDB_QUERY_TYPE_SUBQUERY
)
==
0
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
// todo add test
// SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
// if (pState == NULL) {
// code = TSDB_CODE_TSC_OUT_OF_MEMORY;
// goto _error;
// }
pSql
->
subState
.
numOfSub
=
pQueryInfo
->
numOfTables
;
bool
hasEmptySub
=
false
;
...
...
@@ -1645,9 +1636,9 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
// data in from current vnode is stored in cache and disk
uint32_t
numOfRowsFromSubquery
=
(
uint32_t
)(
trsupport
->
pExtMemBuffer
[
idx
]
->
numOfTotalElems
+
trsupport
->
localBuffer
->
num
);
tscDebug
(
"%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d"
,
pParentSql
,
pSql
,
pTableMetaInfo
->
vgroupList
->
vgroups
[
0
].
epAddr
[
0
].
fqdn
,
pTableMetaInfo
->
vgroupList
->
vgroups
[
0
].
vgId
,
numOfRowsFromSubquery
,
idx
);
SVgroupsInfo
*
vgroupsInfo
=
pTableMetaInfo
->
vgroupList
;
tscDebug
(
"%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d"
,
pParentSql
,
pSql
,
vgroupsInfo
->
vgroups
[
0
].
epAddr
[
0
].
fqdn
,
vgroupsInfo
->
vgroups
[
0
].
vgId
,
numOfRowsFromSubquery
,
idx
);
tColModelCompact
(
pDesc
->
pColumnModel
,
trsupport
->
localBuffer
,
pDesc
->
pColumnModel
->
capacity
);
...
...
@@ -2022,7 +2013,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
static
char
*
getResultBlockPosition
(
SSqlCmd
*
pCmd
,
SSqlRes
*
pRes
,
int32_t
columnIndex
,
int16_t
*
bytes
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
S
FieldSupInfo
*
pInfo
=
(
SFieldSupInfo
*
)
TARRAY_GET_ELEM
(
pQueryInfo
->
fieldsInfo
.
pSupportInfo
,
columnIndex
);
S
InternalField
*
pInfo
=
(
SInternalField
*
)
TARRAY_GET_ELEM
(
pQueryInfo
->
fieldsInfo
.
internalField
,
columnIndex
);
assert
(
pInfo
->
pSqlExpr
!=
NULL
);
*
bytes
=
pInfo
->
pSqlExpr
->
resBytes
;
...
...
@@ -2172,7 +2163,7 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) {
size_t
size
=
tscNumOfFields
(
pQueryInfo
);
for
(
int
i
=
0
;
i
<
size
;
++
i
)
{
S
FieldSupInfo
*
pSup
=
TARRAY_GET_ELEM
(
pQueryInfo
->
fieldsInfo
.
pSupportInfo
,
i
);
S
InternalField
*
pSup
=
TARRAY_GET_ELEM
(
pQueryInfo
->
fieldsInfo
.
internalField
,
i
);
if
(
pSup
->
pSqlExpr
!=
NULL
)
{
tscGetResultColumnChr
(
pRes
,
&
pQueryInfo
->
fieldsInfo
,
i
);
}
...
...
@@ -2182,7 +2173,7 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) {
continue
;
}
TAOS_FIELD
*
pField
=
TARRAY_GET_ELEM
(
pQueryInfo
->
fieldsInfo
.
pFields
,
i
);
TAOS_FIELD
*
pField
=
TARRAY_GET_ELEM
(
pQueryInfo
->
fieldsInfo
.
internalField
,
i
);
if
(
pRes
->
tsrow
[
i
]
!=
NULL
&&
pField
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
transferNcharData
(
pSql
,
i
,
pField
);
}
...
...
src/client/src/tscSystem.c
浏览文件 @
57417c7f
...
...
@@ -77,6 +77,7 @@ int32_t tscInitRpc(const char *user, const char *secretEncrypt, void **pDnodeCon
return
0
;
}
void
taos_init_imp
(
void
)
{
char
temp
[
128
];
...
...
@@ -124,8 +125,9 @@ void taos_init_imp(void) {
double
factor
=
(
tscEmbedded
==
0
)
?
2
.
0
:
4
.
0
;
tscNumOfThreads
=
(
int
)(
tsNumOfCores
*
tsNumOfThreadsPerCore
/
factor
);
if
(
tscNumOfThreads
<
2
)
tscNumOfThreads
=
2
;
if
(
tscNumOfThreads
<
2
)
{
tscNumOfThreads
=
2
;
}
tscQhandle
=
taosInitScheduler
(
queueSize
,
tscNumOfThreads
,
"tsc"
);
if
(
NULL
==
tscQhandle
)
{
...
...
@@ -140,7 +142,7 @@ void taos_init_imp(void) {
int64_t
refreshTime
=
10
;
// 10 seconds by default
if
(
tscMetaCache
==
NULL
)
{
tscMetaCache
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
refreshTime
,
false
,
NULL
,
"tableMeta"
);
tscMetaCache
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
refreshTime
,
false
,
tscFreeTableMetaHelper
,
"tableMeta"
);
tscObjCache
=
taosCacheInit
(
TSDB_CACHE_PTR_KEY
,
refreshTime
/
2
,
false
,
tscFreeRegisteredSqlObj
,
"sqlObj"
);
}
...
...
src/client/src/tscUtil.c
浏览文件 @
57417c7f
...
...
@@ -408,6 +408,24 @@ void tscFreeRegisteredSqlObj(void *pSql) {
}
}
void
tscFreeTableMetaHelper
(
void
*
pTableMeta
)
{
STableMeta
*
p
=
(
STableMeta
*
)
pTableMeta
;
int32_t
numOfEps
=
p
->
vgroupInfo
.
numOfEps
;
assert
(
numOfEps
>=
0
&&
numOfEps
<=
TSDB_MAX_REPLICA
);
for
(
int32_t
i
=
0
;
i
<
numOfEps
;
++
i
)
{
taosTFree
(
p
->
vgroupInfo
.
epAddr
[
i
].
fqdn
);
}
int32_t
numOfEps1
=
p
->
corVgroupInfo
.
numOfEps
;
assert
(
numOfEps1
>=
0
&&
numOfEps1
<=
TSDB_MAX_REPLICA
);
for
(
int32_t
i
=
0
;
i
<
numOfEps1
;
++
i
)
{
taosTFree
(
p
->
corVgroupInfo
.
epAddr
[
i
].
fqdn
);
}
}
void
tscFreeSqlObj
(
SSqlObj
*
pSql
)
{
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
{
return
;
...
...
@@ -686,16 +704,14 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
int32_t
tscMergeTableDataBlocks
(
SSqlObj
*
pSql
,
SArray
*
pTableDataBlockList
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
STableDataBlocks
*
pOneTableBlock
=
taosArrayGetP
(
pTableDataBlockList
,
0
);
int32_t
expandSize
=
getRowExpandSize
(
pOneTableBlock
->
pTableMeta
);
void
*
pVnodeDataBlockHashList
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
SArray
*
pVnodeDataBlockList
=
taosArrayInit
(
8
,
POINTER_BYTES
);
size_t
total
=
taosArrayGetSize
(
pTableDataBlockList
);
for
(
int32_t
i
=
0
;
i
<
total
;
++
i
)
{
pOneTableBlock
=
taosArrayGetP
(
pTableDataBlockList
,
i
);
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
STableDataBlocks
*
pOneTableBlock
=
taosArrayGetP
(
pTableDataBlockList
,
i
);
int32_t
expandSize
=
getRowExpandSize
(
pOneTableBlock
->
pTableMeta
);
STableDataBlocks
*
dataBuf
=
NULL
;
int32_t
ret
=
...
...
@@ -829,35 +845,30 @@ TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes) {
return
f
;
}
S
FieldSupInfo
*
tscFieldInfoAppend
(
SFieldInfo
*
pFieldInfo
,
TAOS_FIELD
*
pField
)
{
S
InternalField
*
tscFieldInfoAppend
(
SFieldInfo
*
pFieldInfo
,
TAOS_FIELD
*
pField
)
{
assert
(
pFieldInfo
!=
NULL
);
taosArrayPush
(
pFieldInfo
->
pFields
,
pField
);
pFieldInfo
->
numOfOutput
++
;
struct
S
FieldSupInfo
info
=
{
struct
S
InternalField
info
=
{
.
pSqlExpr
=
NULL
,
.
pArithExprInfo
=
NULL
,
.
visible
=
true
,
};
return
taosArrayPush
(
pFieldInfo
->
pSupportInfo
,
&
info
);
}
SFieldSupInfo
*
tscFieldInfoGetSupp
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
)
{
return
TARRAY_GET_ELEM
(
pFieldInfo
->
pSupportInfo
,
index
);
info
.
field
=
*
pField
;
return
taosArrayPush
(
pFieldInfo
->
internalField
,
&
info
);
}
SFieldSupInfo
*
tscFieldInfoInsert
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
TAOS_FIELD
*
field
)
{
taosArrayInsert
(
pFieldInfo
->
pFields
,
index
,
field
);
SInternalField
*
tscFieldInfoInsert
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
TAOS_FIELD
*
field
)
{
pFieldInfo
->
numOfOutput
++
;
struct
SFieldSupInfo
info
=
{
struct
SInternalField
info
=
{
.
pSqlExpr
=
NULL
,
.
pArithExprInfo
=
NULL
,
.
visible
=
true
,
};
return
taosArrayInsert
(
pFieldInfo
->
pSupportInfo
,
index
,
&
info
);
info
.
field
=
*
field
;
return
taosArrayInsert
(
pFieldInfo
->
internalField
,
index
,
&
info
);
}
void
tscFieldInfoUpdateOffset
(
SQueryInfo
*
pQueryInfo
)
{
...
...
@@ -891,29 +902,18 @@ void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) {
}
}
void
tscFieldInfoCopy
(
SFieldInfo
*
dst
,
const
SFieldInfo
*
src
)
{
dst
->
numOfOutput
=
src
->
numOfOutput
;
if
(
dst
->
pFields
==
NULL
)
{
dst
->
pFields
=
taosArrayClone
(
src
->
pFields
);
}
else
{
taosArrayCopy
(
dst
->
pFields
,
src
->
pFields
);
}
if
(
dst
->
pSupportInfo
==
NULL
)
{
dst
->
pSupportInfo
=
taosArrayClone
(
src
->
pSupportInfo
);
}
else
{
taosArrayCopy
(
dst
->
pSupportInfo
,
src
->
pSupportInfo
);
}
SInternalField
*
tscFieldInfoGetInternalField
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
)
{
assert
(
index
<
pFieldInfo
->
numOfOutput
);
return
TARRAY_GET_ELEM
(
pFieldInfo
->
internalField
,
index
);
}
TAOS_FIELD
*
tscFieldInfoGetField
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
)
{
assert
(
index
<
pFieldInfo
->
numOfOutput
);
return
TARRAY_GET_ELEM
(
pFieldInfo
->
pFields
,
index
)
;
return
&
((
SInternalField
*
)
TARRAY_GET_ELEM
(
pFieldInfo
->
internalField
,
index
))
->
field
;
}
int16_t
tscFieldInfoGetOffset
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
)
{
S
FieldSupInfo
*
pInfo
=
tscFieldInfoGetSupp
(
&
pQueryInfo
->
fieldsInfo
,
index
);
S
InternalField
*
pInfo
=
tscFieldInfoGetInternalField
(
&
pQueryInfo
->
fieldsInfo
,
index
);
assert
(
pInfo
!=
NULL
&&
pInfo
->
pSqlExpr
!=
NULL
);
return
pInfo
->
pSqlExpr
->
offset
;
...
...
@@ -960,10 +960,8 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
return
;
}
taosArrayDestroy
(
pFieldInfo
->
pFields
);
for
(
int32_t
i
=
0
;
i
<
pFieldInfo
->
numOfOutput
;
++
i
)
{
S
FieldSupInfo
*
pInfo
=
taosArrayGet
(
pFieldInfo
->
pSupportInfo
,
i
);
S
InternalField
*
pInfo
=
taosArrayGet
(
pFieldInfo
->
internalField
,
i
);
if
(
pInfo
->
pArithExprInfo
!=
NULL
)
{
tExprTreeDestroy
(
&
pInfo
->
pArithExprInfo
->
pExpr
,
NULL
);
...
...
@@ -971,7 +969,9 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
}
}
taosArrayDestroy
(
pFieldInfo
->
pSupportInfo
);
taosArrayDestroy
(
pFieldInfo
->
internalField
);
taosTFree
(
pFieldInfo
->
final
);
memset
(
pFieldInfo
,
0
,
sizeof
(
SFieldInfo
));
}
...
...
@@ -1615,11 +1615,8 @@ STableMetaInfo* tscGetTableMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, i
}
void
tscInitQueryInfo
(
SQueryInfo
*
pQueryInfo
)
{
assert
(
pQueryInfo
->
fieldsInfo
.
pFields
==
NULL
);
pQueryInfo
->
fieldsInfo
.
pFields
=
taosArrayInit
(
4
,
sizeof
(
TAOS_FIELD
));
assert
(
pQueryInfo
->
fieldsInfo
.
pSupportInfo
==
NULL
);
pQueryInfo
->
fieldsInfo
.
pSupportInfo
=
taosArrayInit
(
4
,
sizeof
(
SFieldSupInfo
));
assert
(
pQueryInfo
->
fieldsInfo
.
internalField
==
NULL
);
pQueryInfo
->
fieldsInfo
.
internalField
=
taosArrayInit
(
4
,
sizeof
(
SInternalField
));
assert
(
pQueryInfo
->
exprList
==
NULL
);
pQueryInfo
->
exprList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
...
...
@@ -1682,8 +1679,14 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) {
void
tscFreeVgroupTableInfo
(
SArray
*
pVgroupTables
)
{
if
(
pVgroupTables
!=
NULL
)
{
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
pVgroupTables
);
i
++
)
{
size_t
num
=
taosArrayGetSize
(
pVgroupTables
);
for
(
size_t
i
=
0
;
i
<
num
;
i
++
)
{
SVgroupTableInfo
*
pInfo
=
taosArrayGet
(
pVgroupTables
,
i
);
for
(
int32_t
j
=
0
;
j
<
pInfo
->
vgInfo
.
numOfEps
;
++
j
)
{
taosTFree
(
pInfo
->
vgInfo
.
epAddr
[
j
].
fqdn
);
}
taosArrayDestroy
(
pInfo
->
itemList
);
}
taosArrayDestroy
(
pVgroupTables
);
...
...
@@ -1695,6 +1698,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool rem
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
numOfTables
;
++
i
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
i
);
tscFreeVgroupTableInfo
(
pTableMetaInfo
->
pVgroupTables
);
tscClearTableMetaInfo
(
pTableMetaInfo
,
removeFromCache
);
free
(
pTableMetaInfo
);
...
...
@@ -1727,13 +1731,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
pTableMetaInfo
->
pTableMeta
=
pTableMeta
;
if
(
vgroupList
!=
NULL
)
{
size_t
size
=
sizeof
(
SVgroupsInfo
)
+
sizeof
(
SCMVgroupInfo
)
*
vgroupList
->
numOfVgroups
;
pTableMetaInfo
->
vgroupList
=
malloc
(
size
);
if
(
pTableMetaInfo
->
vgroupList
==
NULL
)
{
return
NULL
;
}
memcpy
(
pTableMetaInfo
->
vgroupList
,
vgroupList
,
size
);
pTableMetaInfo
->
vgroupList
=
tscVgroupInfoClone
(
vgroupList
);
}
pTableMetaInfo
->
tagColList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
...
...
@@ -1762,8 +1760,7 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache)
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
(
pTableMetaInfo
->
pTableMeta
),
removeFromCache
);
}
taosTFree
(
pTableMetaInfo
->
vgroupList
);
pTableMetaInfo
->
vgroupList
=
tscVgroupInfoClear
(
pTableMetaInfo
->
vgroupList
);
tscColumnListDestroy
(
pTableMetaInfo
->
tagColList
);
pTableMetaInfo
->
tagColList
=
NULL
;
}
...
...
@@ -1831,54 +1828,23 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
return
pNew
;
}
// current sql function is not direct output result, so create a dummy output field
static
void
doSetNewFieldInfo
(
SQueryInfo
*
pNewQueryInfo
,
SSqlExpr
*
pExpr
)
{
TAOS_FIELD
f
=
{.
type
=
(
uint8_t
)
pExpr
->
resType
,
.
bytes
=
pExpr
->
resBytes
};
tstrncpy
(
f
.
name
,
pExpr
->
aliasName
,
sizeof
(
f
.
name
));
SFieldSupInfo
*
pInfo1
=
tscFieldInfoAppend
(
&
pNewQueryInfo
->
fieldsInfo
,
&
f
);
pInfo1
->
pSqlExpr
=
pExpr
;
pInfo1
->
visible
=
false
;
}
static
void
doSetSqlExprAndResultFieldInfo
(
SQueryInfo
*
pQueryInfo
,
SQueryInfo
*
pNewQueryInfo
,
int64_t
uid
)
{
int32_t
numOfOutput
=
(
int32_t
)
tscSqlExprNumOfExprs
(
pNewQueryInfo
);
if
(
numOfOutput
==
0
)
{
return
;
}
size_t
numOfExprs
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
SFieldInfo
*
pFieldInfo
=
&
pQueryInfo
->
fieldsInfo
;
// set the field info in pNewQueryInfo object
// set the field info in pNewQueryInfo object according to sqlExpr information
size_t
numOfExprs
=
tscSqlExprNumOfExprs
(
pNewQueryInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfExprs
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
p
New
QueryInfo
,
i
);
if
(
pExpr
->
uid
==
uid
)
{
if
(
i
<
pFieldInfo
->
numOfOutput
)
{
SFieldSupInfo
*
pInfo
=
tscFieldInfoGetSupp
(
pFieldInfo
,
i
);
if
(
pInfo
->
pSqlExpr
!=
NULL
)
{
TAOS_FIELD
*
p
=
tscFieldInfoGetField
(
pFieldInfo
,
i
);
assert
(
strcmp
(
p
->
name
,
pExpr
->
aliasName
)
==
0
);
SFieldSupInfo
*
pInfo1
=
tscFieldInfoAppend
(
&
pNewQueryInfo
->
fieldsInfo
,
p
);
*
pInfo1
=
*
pInfo
;
}
else
{
assert
(
pInfo
->
pArithExprInfo
!=
NULL
);
doSetNewFieldInfo
(
pNewQueryInfo
,
pExpr
);
}
}
else
{
// it is a arithmetic column, does not have actual field for sqlExpr, so build it
doSetNewFieldInfo
(
pNewQueryInfo
,
pExpr
);
}
}
TAOS_FIELD
f
=
tscCreateField
((
int8_t
)
pExpr
->
resType
,
pExpr
->
aliasName
,
pExpr
->
resBytes
);
SInternalField
*
pInfo1
=
tscFieldInfoAppend
(
&
pNewQueryInfo
->
fieldsInfo
,
&
f
);
pInfo1
->
pSqlExpr
=
pExpr
;
}
// make sure the the sqlExpr for each fields is correct
numOfExprs
=
tscSqlExprNumOfExprs
(
pNewQueryInfo
);
// update the pSqlExpr pointer in SFieldSupInfo according the field name
// update the pSqlExpr pointer in SInternalField according the field name
// make sure the pSqlExpr point to the correct SqlExpr in pNewQueryInfo, not SqlExpr in pQueryInfo
for
(
int32_t
f
=
0
;
f
<
pNewQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
f
)
{
TAOS_FIELD
*
field
=
tscFieldInfoGetField
(
&
pNewQueryInfo
->
fieldsInfo
,
f
);
...
...
@@ -1888,7 +1854,7 @@ static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pQueryInfo, SQueryInfo* p
SSqlExpr
*
pExpr1
=
tscSqlExprGet
(
pNewQueryInfo
,
k1
);
if
(
strcmp
(
field
->
name
,
pExpr1
->
aliasName
)
==
0
)
{
// establish link according to the result field name
S
FieldSupInfo
*
pInfo
=
tscFieldInfoGetSupp
(
&
pNewQueryInfo
->
fieldsInfo
,
f
);
S
InternalField
*
pInfo
=
tscFieldInfoGetInternalField
(
&
pNewQueryInfo
->
fieldsInfo
,
f
);
pInfo
->
pSqlExpr
=
pExpr1
;
matched
=
true
;
...
...
@@ -2403,3 +2369,58 @@ void tscClearSqlOwner(SSqlObj* pSql) {
assert
(
taosCheckPthreadValid
(
pSql
->
owner
));
atomic_store_64
(
&
pSql
->
owner
,
0
);
}
SVgroupsInfo
*
tscVgroupInfoClone
(
SVgroupsInfo
*
vgroupList
)
{
if
(
vgroupList
==
NULL
)
{
return
NULL
;
}
size_t
size
=
sizeof
(
SVgroupsInfo
)
+
sizeof
(
SCMVgroupInfo
)
*
vgroupList
->
numOfVgroups
;
SVgroupsInfo
*
pNew
=
calloc
(
1
,
size
);
if
(
pNew
==
NULL
)
{
return
NULL
;
}
pNew
->
numOfVgroups
=
vgroupList
->
numOfVgroups
;
for
(
int32_t
i
=
0
;
i
<
vgroupList
->
numOfVgroups
;
++
i
)
{
SCMVgroupInfo
*
pNewVInfo
=
&
pNew
->
vgroups
[
i
];
SCMVgroupInfo
*
pvInfo
=
&
vgroupList
->
vgroups
[
i
];
pNewVInfo
->
vgId
=
pvInfo
->
vgId
;
pNewVInfo
->
numOfEps
=
pvInfo
->
numOfEps
;
for
(
int32_t
j
=
0
;
j
<
pvInfo
->
numOfEps
;
++
j
)
{
pNewVInfo
->
epAddr
[
j
].
fqdn
=
strdup
(
pvInfo
->
epAddr
[
j
].
fqdn
);
pNewVInfo
->
epAddr
[
j
].
port
=
pvInfo
->
epAddr
[
j
].
port
;
}
}
return
pNew
;
}
void
*
tscVgroupInfoClear
(
SVgroupsInfo
*
vgroupList
)
{
if
(
vgroupList
==
NULL
)
{
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
vgroupList
->
numOfVgroups
;
++
i
)
{
SCMVgroupInfo
*
pVgroupInfo
=
&
vgroupList
->
vgroups
[
i
];
for
(
int32_t
j
=
0
;
j
<
pVgroupInfo
->
numOfEps
;
++
j
)
{
taosTFree
(
pVgroupInfo
->
epAddr
[
j
].
fqdn
);
}
}
taosTFree
(
vgroupList
);
return
NULL
;
}
void
tscSCMVgroupInfoCopy
(
SCMVgroupInfo
*
dst
,
const
SCMVgroupInfo
*
src
)
{
dst
->
vgId
=
src
->
vgId
;
dst
->
numOfEps
=
src
->
numOfEps
;
for
(
int32_t
i
=
0
;
i
<
dst
->
numOfEps
;
++
i
)
{
dst
->
epAddr
[
i
].
port
=
src
->
epAddr
[
i
].
port
;
dst
->
epAddr
[
i
].
fqdn
=
strdup
(
src
->
epAddr
[
i
].
fqdn
);
}
}
src/dnode/src/dnodeShell.c
浏览文件 @
57417c7f
...
...
@@ -173,15 +173,15 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
return
rpcRsp
.
code
;
}
void
*
dnodeSendCfgTableToRecv
(
int32_t
vgId
,
int32_t
s
id
)
{
dDebug
(
"vgId:%d,
sid:%d send config table msg to mnode"
,
vgId
,
s
id
);
void
*
dnodeSendCfgTableToRecv
(
int32_t
vgId
,
int32_t
t
id
)
{
dDebug
(
"vgId:%d,
tid:%d send config table msg to mnode"
,
vgId
,
t
id
);
int32_t
contLen
=
sizeof
(
SDMConfigTableMsg
);
SDMConfigTableMsg
*
pMsg
=
rpcMallocCont
(
contLen
);
pMsg
->
dnodeId
=
htonl
(
dnodeGetDnodeId
());
pMsg
->
vgId
=
htonl
(
vgId
);
pMsg
->
sid
=
htonl
(
s
id
);
pMsg
->
tid
=
htonl
(
t
id
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pMsg
;
...
...
@@ -194,18 +194,18 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) {
if
(
rpcRsp
.
code
!=
0
)
{
rpcFreeCont
(
rpcRsp
.
pCont
);
dError
(
"vgId:%d,
sid:%d failed to config table from mnode"
,
vgId
,
s
id
);
dError
(
"vgId:%d,
tid:%d failed to config table from mnode"
,
vgId
,
t
id
);
return
NULL
;
}
else
{
dInfo
(
"vgId:%d,
sid:%d config table msg is received"
,
vgId
,
s
id
);
dInfo
(
"vgId:%d,
tid:%d config table msg is received"
,
vgId
,
t
id
);
// delete this after debug finished
SMDCreateTableMsg
*
pTable
=
rpcRsp
.
pCont
;
int16_t
numOfColumns
=
htons
(
pTable
->
numOfColumns
);
int16_t
numOfTags
=
htons
(
pTable
->
numOfTags
);
int32_t
sid
=
htonl
(
pTable
->
s
id
);
int32_t
tableId
=
htonl
(
pTable
->
t
id
);
uint64_t
uid
=
htobe64
(
pTable
->
uid
);
dInfo
(
"table:%s, numOfColumns:%d numOfTags:%d
sid:%d uid:%"
PRIu64
,
pTable
->
tableId
,
numOfColumns
,
numOfTags
,
si
d
,
uid
);
dInfo
(
"table:%s, numOfColumns:%d numOfTags:%d
tid:%d uid:%"
PRIu64
,
pTable
->
tableId
,
numOfColumns
,
numOfTags
,
tableI
d
,
uid
);
return
rpcRsp
.
pCont
;
}
...
...
src/dnode/src/dnodeVRead.c
浏览文件 @
57417c7f
...
...
@@ -212,7 +212,8 @@ static void *dnodeProcessReadQueue(void *param) {
}
else
{
if
(
code
==
TSDB_CODE_QRY_HAS_RSP
)
{
dnodeSendRpcReadRsp
(
pVnode
,
pReadMsg
,
pReadMsg
->
rpcMsg
.
code
);
}
else
{
// code == TSDB_CODE_NOT_READY, do not return msg to client
}
else
{
// code == TSDB_CODE_QRY_NOT_READY, do not return msg to client
assert
(
pReadMsg
->
rpcMsg
.
handle
==
NULL
||
(
pReadMsg
->
rpcMsg
.
handle
!=
NULL
&&
pReadMsg
->
rpcMsg
.
msgType
==
5
));
dnodeDispatchNonRspMsg
(
pVnode
,
pReadMsg
,
code
);
}
}
...
...
src/inc/dnode.h
浏览文件 @
57417c7f
...
...
@@ -49,7 +49,7 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void
dnodeSendMsgToDnode
(
SRpcEpSet
*
epSet
,
SRpcMsg
*
rpcMsg
);
void
dnodeSendMsgToMnodeRecv
(
SRpcMsg
*
rpcMsg
,
SRpcMsg
*
rpcRsp
);
void
dnodeSendMsgToDnodeRecv
(
SRpcMsg
*
rpcMsg
,
SRpcMsg
*
rpcRsp
,
SRpcEpSet
*
epSet
);
void
*
dnodeSendCfgTableToRecv
(
int32_t
vgId
,
int32_t
s
id
);
void
*
dnodeSendCfgTableToRecv
(
int32_t
vgId
,
int32_t
t
id
);
void
*
dnodeAllocateVnodeWqueue
(
void
*
pVnode
);
void
dnodeFreeVnodeWqueue
(
void
*
queue
);
...
...
src/inc/taosmsg.h
浏览文件 @
57417c7f
...
...
@@ -182,10 +182,16 @@ extern char *taosMsg[];
#pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
typedef
struct
{
char
fqdn
[
TSDB_FQDN_LEN
];
uint16_t
port
;
}
SEpAddr
;
}
SEpAddrMsg
;
typedef
struct
{
char
*
fqdn
;
uint16_t
port
;
}
SEpAddr1
;
typedef
struct
{
int32_t
numOfVnodes
;
...
...
@@ -245,7 +251,7 @@ typedef struct {
int8_t
tableType
;
int16_t
numOfColumns
;
int16_t
numOfTags
;
int32_t
s
id
;
int32_t
t
id
;
int32_t
sversion
;
int32_t
tversion
;
int32_t
tagDataLen
;
...
...
@@ -354,7 +360,7 @@ typedef struct {
typedef
struct
{
int32_t
contLen
;
int32_t
vgId
;
int32_t
s
id
;
int32_t
t
id
;
uint64_t
uid
;
char
tableId
[
TSDB_TABLE_FNAME_LEN
];
}
SMDDropTableMsg
;
...
...
@@ -401,6 +407,7 @@ typedef struct SExprInfo {
int16_t
bytes
;
int16_t
type
;
int32_t
interBytes
;
int64_t
uid
;
}
SExprInfo
;
typedef
struct
SColumnFilterInfo
{
...
...
@@ -662,16 +669,27 @@ typedef struct SCMSTableVgroupMsg {
}
SCMSTableVgroupMsg
,
SCMSTableVgroupRspMsg
;
typedef
struct
{
int32_t
vgId
;
int8_t
numOfEps
;
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
int32_t
vgId
;
int8_t
numOfEps
;
SEpAddr
1
epAddr
[
TSDB_MAX_REPLICA
];
}
SCMVgroupInfo
;
typedef
struct
{
int32_t
vgId
;
int8_t
numOfEps
;
SEpAddrMsg
epAddr
[
TSDB_MAX_REPLICA
];
}
SCMVgroupMsg
;
typedef
struct
{
int32_t
numOfVgroups
;
SCMVgroupInfo
vgroups
[];
}
SVgroupsInfo
;
typedef
struct
{
int32_t
numOfVgroups
;
SCMVgroupMsg
vgroups
[];
}
SVgroupsMsg
;
typedef
struct
STableMetaMsg
{
int32_t
contLen
;
char
tableId
[
TSDB_TABLE_FNAME_LEN
];
// table id
...
...
@@ -682,9 +700,9 @@ typedef struct STableMetaMsg {
int16_t
numOfColumns
;
int16_t
sversion
;
int16_t
tversion
;
int32_t
s
id
;
int32_t
t
id
;
uint64_t
uid
;
SCMVgroup
Info
vgroup
;
SCMVgroup
Msg
vgroup
;
SSchema
schema
[];
}
STableMetaMsg
;
...
...
@@ -730,7 +748,7 @@ typedef struct {
typedef
struct
{
int32_t
dnodeId
;
int32_t
vgId
;
int32_t
s
id
;
int32_t
t
id
;
}
SDMConfigTableMsg
;
typedef
struct
{
...
...
src/mnode/inc/mnodeDef.h
浏览文件 @
57417c7f
...
...
@@ -115,7 +115,7 @@ typedef struct {
uint64_t
suid
;
int64_t
createdTime
;
int32_t
numOfColumns
;
//used by normal table
int32_t
s
id
;
int32_t
t
id
;
int32_t
vgId
;
int32_t
sqlLen
;
int8_t
updateEnd
[
4
];
...
...
src/mnode/src/mnodeTable.c
浏览文件 @
57417c7f
...
...
@@ -303,7 +303,7 @@ static int32_t mnodeChildTableActionRestored() {
SVgObj
*
pVgroup
=
mnodeGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
mError
(
"ctable:%s, failed to get vgId:%d
sid:%d, discard it"
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
s
id
);
mError
(
"ctable:%s, failed to get vgId:%d
tid:%d, discard it"
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
t
id
);
pTable
->
vgId
=
0
;
SSdbOper
desc
=
{.
type
=
SDB_OPER_LOCAL
,
.
pObj
=
pTable
,
.
table
=
tsChildTableSdb
};
sdbDeleteRow
(
&
desc
);
...
...
@@ -314,7 +314,7 @@ static int32_t mnodeChildTableActionRestored() {
if
(
strcmp
(
pVgroup
->
dbName
,
pDb
->
name
)
!=
0
)
{
mError
(
"ctable:%s, db:%s not match with vgId:%d db:%s sid:%d, discard it"
,
pTable
->
info
.
tableId
,
pDb
->
name
,
pTable
->
vgId
,
pVgroup
->
dbName
,
pTable
->
s
id
);
pTable
->
info
.
tableId
,
pDb
->
name
,
pTable
->
vgId
,
pVgroup
->
dbName
,
pTable
->
t
id
);
pTable
->
vgId
=
0
;
SSdbOper
desc
=
{.
type
=
SDB_OPER_LOCAL
,
.
pObj
=
pTable
,
.
table
=
tsChildTableSdb
};
sdbDeleteRow
(
&
desc
);
...
...
@@ -771,8 +771,8 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
return
mnodeProcessDropSuperTableMsg
(
pMsg
);
}
else
{
SChildTableObj
*
pCTable
=
(
SChildTableObj
*
)
pMsg
->
pTable
;
mInfo
(
"app:%p:%p, table:%s, start to drop ctable, vgId:%d
s
id:%d uid:%"
PRIu64
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pDrop
->
tableId
,
pCTable
->
vgId
,
pCTable
->
s
id
,
pCTable
->
uid
);
mInfo
(
"app:%p:%p, table:%s, start to drop ctable, vgId:%d
t
id:%d uid:%"
PRIu64
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pDrop
->
tableId
,
pCTable
->
vgId
,
pCTable
->
t
id
,
pCTable
->
uid
);
return
mnodeProcessDropChildTableMsg
(
pMsg
);
}
}
...
...
@@ -1476,13 +1476,14 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
int32_t
numOfTable
=
htonl
(
pInfo
->
numOfTables
);
// reserve space
int32_t
contLen
=
sizeof
(
SCMSTableVgroupRspMsg
)
+
32
*
sizeof
(
SCMVgroup
Info
)
+
sizeof
(
SVgroupsInfo
);
int32_t
contLen
=
sizeof
(
SCMSTableVgroupRspMsg
)
+
32
*
sizeof
(
SCMVgroup
Msg
)
+
sizeof
(
SVgroupsMsg
);
for
(
int32_t
i
=
0
;
i
<
numOfTable
;
++
i
)
{
char
*
stableName
=
(
char
*
)
pInfo
+
sizeof
(
SCMSTableVgroupMsg
)
+
(
TSDB_TABLE_FNAME_LEN
)
*
i
;
SSuperTableObj
*
pTable
=
mnodeGetSuperTable
(
stableName
);
if
(
pTable
!=
NULL
&&
pTable
->
vgHash
!=
NULL
)
{
contLen
+=
(
taosHashGetSize
(
pTable
->
vgHash
)
*
sizeof
(
SCMVgroup
Info
)
+
sizeof
(
SVgroupsInfo
));
contLen
+=
(
taosHashGetSize
(
pTable
->
vgHash
)
*
sizeof
(
SCMVgroup
Msg
)
+
sizeof
(
SVgroupsMsg
));
}
mnodeDecTableRef
(
pTable
);
}
...
...
@@ -1510,12 +1511,12 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
// even this super table has no corresponding table, still return
pRsp
->
numOfTables
++
;
SVgroups
Info
*
pVgroupInfo
=
(
SVgroupsInfo
*
)
msg
;
pVgroup
Info
->
numOfVgroups
=
0
;
SVgroups
Msg
*
pVgroupMsg
=
(
SVgroupsMsg
*
)
msg
;
pVgroup
Msg
->
numOfVgroups
=
0
;
msg
+=
sizeof
(
SVgroups
Info
);
msg
+=
sizeof
(
SVgroups
Msg
);
}
else
{
SVgroups
Info
*
pVgroupInfo
=
(
SVgroupsInfo
*
)
msg
;
SVgroups
Msg
*
pVgroupMsg
=
(
SVgroupsMsg
*
)
msg
;
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
pTable
->
vgHash
);
int32_t
vgSize
=
0
;
...
...
@@ -1524,15 +1525,17 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
SVgObj
*
pVgroup
=
mnodeGetVgroup
(
*
pVgId
);
if
(
pVgroup
==
NULL
)
continue
;
pVgroupInfo
->
vgroups
[
vgSize
].
vgId
=
htonl
(
pVgroup
->
vgId
);
pVgroupMsg
->
vgroups
[
vgSize
].
vgId
=
htonl
(
pVgroup
->
vgId
);
pVgroupMsg
->
vgroups
[
vgSize
].
numOfEps
=
0
;
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
numOfVnodes
;
++
vn
)
{
SDnodeObj
*
pDnode
=
pVgroup
->
vnodeGid
[
vn
].
pDnode
;
if
(
pDnode
==
NULL
)
break
;
tstrncpy
(
pVgroup
Info
->
vgroups
[
vgSize
].
epAddr
[
vn
].
fqdn
,
pDnode
->
dnodeFqdn
,
TSDB_FQDN_LEN
);
pVgroup
Info
->
vgroups
[
vgSize
].
epAddr
[
vn
].
port
=
htons
(
pDnode
->
dnodePort
);
tstrncpy
(
pVgroup
Msg
->
vgroups
[
vgSize
].
epAddr
[
vn
].
fqdn
,
pDnode
->
dnodeFqdn
,
TSDB_FQDN_LEN
);
pVgroup
Msg
->
vgroups
[
vgSize
].
epAddr
[
vn
].
port
=
htons
(
pDnode
->
dnodePort
);
pVgroup
Info
->
vgroups
[
vgSize
].
numOfEps
++
;
pVgroup
Msg
->
vgroups
[
vgSize
].
numOfEps
++
;
}
vgSize
++
;
...
...
@@ -1542,10 +1545,10 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
taosHashDestroyIter
(
pIter
);
mnodeDecTableRef
(
pTable
);
pVgroup
Info
->
numOfVgroups
=
htonl
(
vgSize
);
pVgroup
Msg
->
numOfVgroups
=
htonl
(
vgSize
);
// one table is done, try the next table
msg
+=
sizeof
(
SVgroups
Info
)
+
vgSize
*
sizeof
(
SCMVgroupInfo
);
msg
+=
sizeof
(
SVgroups
Msg
)
+
vgSize
*
sizeof
(
SCMVgroupMsg
);
pRsp
->
numOfTables
++
;
}
}
...
...
@@ -1595,7 +1598,7 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO
pCreate
->
vgId
=
htonl
(
pTable
->
vgId
);
pCreate
->
tableType
=
pTable
->
info
.
type
;
pCreate
->
createdTime
=
htobe64
(
pTable
->
createdTime
);
pCreate
->
sid
=
htonl
(
pTable
->
s
id
);
pCreate
->
tid
=
htonl
(
pTable
->
t
id
);
pCreate
->
sqlDataLen
=
htonl
(
pTable
->
sqlLen
);
pCreate
->
uid
=
htobe64
(
pTable
->
uid
);
...
...
@@ -1644,7 +1647,7 @@ static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) {
assert
(
pTable
);
mDebug
(
"app:%p:%p, table:%s, created in mnode, vgId:%d sid:%d, uid:%"
PRIu64
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
s
id
,
pTable
->
uid
);
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
t
id
,
pTable
->
uid
);
SCMCreateTableMsg
*
pCreate
=
pMsg
->
rpcMsg
.
pCont
;
SMDCreateTableMsg
*
pMDCreate
=
mnodeBuildCreateChildTableMsg
(
pCreate
,
pTable
);
...
...
@@ -1686,7 +1689,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
else
{
mError
(
"app:%p:%p, table:%s, failed to create table sid:%d, uid:%"
PRIu64
", reason:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pTable
->
info
.
tableId
,
pTable
->
s
id
,
pTable
->
uid
,
tstrerror
(
code
));
pTable
->
info
.
tableId
,
pTable
->
t
id
,
pTable
->
uid
,
tstrerror
(
code
));
SSdbOper
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
pObj
=
pTable
,
.
table
=
tsChildTableSdb
};
sdbDeleteRow
(
&
desc
);
return
code
;
...
...
@@ -1710,7 +1713,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
pTable
->
info
.
tableId
=
strdup
(
pCreate
->
tableId
);
pTable
->
createdTime
=
taosGetTimestampMs
();
pTable
->
s
id
=
tid
;
pTable
->
t
id
=
tid
;
pTable
->
vgId
=
pVgroup
->
vgId
;
if
(
pTable
->
info
.
type
==
TSDB_CHILD_TABLE
)
{
...
...
@@ -1724,7 +1727,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
}
pTable
->
suid
=
pMsg
->
pSTable
->
uid
;
pTable
->
uid
=
(((
uint64_t
)
pTable
->
vgId
)
<<
48
)
+
((((
uint64_t
)
pTable
->
s
id
)
&
((
1ul
<<
24
)
-
1ul
))
<<
24
)
+
pTable
->
uid
=
(((
uint64_t
)
pTable
->
vgId
)
<<
48
)
+
((((
uint64_t
)
pTable
->
t
id
)
&
((
1ul
<<
24
)
-
1ul
))
<<
24
)
+
((
sdbGetVersion
()
&
((
1ul
<<
16
)
-
1ul
))
<<
8
)
+
(
taosRand
()
&
((
1ul
<<
8
)
-
1ul
));
pTable
->
superTable
=
pMsg
->
pSTable
;
}
else
{
...
...
@@ -1732,7 +1735,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
int64_t
us
=
taosGetTimestampUs
();
pTable
->
uid
=
(
us
<<
24
)
+
((
sdbGetVersion
()
&
((
1ul
<<
16
)
-
1ul
))
<<
8
)
+
(
taosRand
()
&
((
1ul
<<
8
)
-
1ul
));
}
else
{
pTable
->
uid
=
(((
uint64_t
)
pTable
->
vgId
)
<<
48
)
+
((((
uint64_t
)
pTable
->
s
id
)
&
((
1ul
<<
24
)
-
1ul
))
<<
24
)
+
pTable
->
uid
=
(((
uint64_t
)
pTable
->
vgId
)
<<
48
)
+
((((
uint64_t
)
pTable
->
t
id
)
&
((
1ul
<<
24
)
-
1ul
))
<<
24
)
+
((
sdbGetVersion
()
&
((
1ul
<<
16
)
-
1ul
))
<<
8
)
+
(
taosRand
()
&
((
1ul
<<
8
)
-
1ul
));
}
...
...
@@ -1789,7 +1792,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
tstrerror
(
code
));
}
else
{
mDebug
(
"app:%p:%p, table:%s, allocated in vgroup, vgId:%d sid:%d uid:%"
PRIu64
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pTable
->
info
.
tableId
,
pVgroup
->
vgId
,
pTable
->
s
id
,
pTable
->
uid
);
pTable
->
info
.
tableId
,
pVgroup
->
vgId
,
pTable
->
t
id
,
pTable
->
uid
);
}
return
code
;
...
...
@@ -1807,8 +1810,8 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
if
(
pMsg
->
retry
==
0
)
{
if
(
pMsg
->
pTable
==
NULL
)
{
SVgObj
*
pVgroup
=
NULL
;
int32_t
s
id
=
0
;
code
=
mnodeGetAvailableVgroup
(
pMsg
,
&
pVgroup
,
&
s
id
);
int32_t
t
id
=
0
;
code
=
mnodeGetAvailableVgroup
(
pMsg
,
&
pVgroup
,
&
t
id
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mDebug
(
"app:%p:%p, table:%s, failed to get available vgroup, reason:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pCreate
->
tableId
,
tstrerror
(
code
));
...
...
@@ -1822,7 +1825,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
pMsg
->
pVgroup
=
pVgroup
;
mnodeIncVgroupRef
(
pVgroup
);
return
mnodeDoCreateChildTable
(
pMsg
,
s
id
);
return
mnodeDoCreateChildTable
(
pMsg
,
t
id
);
}
}
else
{
if
(
pMsg
->
pTable
==
NULL
)
pMsg
->
pTable
=
mnodeGetTable
(
pCreate
->
tableId
);
...
...
@@ -1852,13 +1855,13 @@ static int32_t mnodeSendDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
tstrncpy
(
pDrop
->
tableId
,
pTable
->
info
.
tableId
,
TSDB_TABLE_FNAME_LEN
);
pDrop
->
vgId
=
htonl
(
pTable
->
vgId
);
pDrop
->
contLen
=
htonl
(
sizeof
(
SMDDropTableMsg
));
pDrop
->
sid
=
htonl
(
pTable
->
s
id
);
pDrop
->
tid
=
htonl
(
pTable
->
t
id
);
pDrop
->
uid
=
htobe64
(
pTable
->
uid
);
SRpcEpSet
epSet
=
mnodeGetEpSetFromVgroup
(
pMsg
->
pVgroup
);
mInfo
(
"app:%p:%p, ctable:%s, send drop ctable msg, vgId:%d sid:%d uid:%"
PRIu64
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pDrop
->
tableId
,
pTable
->
vgId
,
pTable
->
s
id
,
pTable
->
uid
);
pDrop
->
tableId
,
pTable
->
vgId
,
pTable
->
t
id
,
pTable
->
uid
);
SRpcMsg
rpcMsg
=
{
.
ahandle
=
pMsg
,
...
...
@@ -2097,7 +2100,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
pMsg
->
pTable
;
pMeta
->
uid
=
htobe64
(
pTable
->
uid
);
pMeta
->
sid
=
htonl
(
pTable
->
s
id
);
pMeta
->
tid
=
htonl
(
pTable
->
t
id
);
pMeta
->
precision
=
pDb
->
cfg
.
precision
;
pMeta
->
tableType
=
pTable
->
info
.
type
;
tstrncpy
(
pMeta
->
tableId
,
pTable
->
info
.
tableId
,
TSDB_TABLE_FNAME_LEN
);
...
...
@@ -2137,7 +2140,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
pMeta
->
vgroup
.
vgId
=
htonl
(
pMsg
->
pVgroup
->
vgId
);
mDebug
(
"app:%p:%p, table:%s, uid:%"
PRIu64
" table meta is retrieved, vgId:%d sid:%d"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pTable
->
info
.
tableId
,
pTable
->
uid
,
pTable
->
vgId
,
pTable
->
s
id
);
pTable
->
info
.
tableId
,
pTable
->
uid
,
pTable
->
vgId
,
pTable
->
t
id
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -2289,11 +2292,11 @@ static void mnodeDropAllChildTablesInStable(SSuperTableObj *pStable) {
}
#if 0
static SChildTableObj* mnodeGetTableByPos(int32_t vnode, int32_t
s
id) {
static SChildTableObj* mnodeGetTableByPos(int32_t vnode, int32_t
t
id) {
SVgObj *pVgroup = mnodeGetVgroup(vnode);
if (pVgroup == NULL) return NULL;
SChildTableObj *pTable = pVgroup->tableList[
s
id - 1];
SChildTableObj *pTable = pVgroup->tableList[
t
id - 1];
mnodeIncTableRef((STableObj *)pTable);
mnodeDecVgroupRef(pVgroup);
...
...
@@ -2341,12 +2344,12 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
assert
(
pTable
);
mInfo
(
"app:%p:%p, table:%s, drop table rsp received, vgId:%d sid:%d uid:%"
PRIu64
", thandle:%p result:%s"
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
s
id
,
pTable
->
uid
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
t
id
,
pTable
->
uid
,
mnodeMsg
->
rpcMsg
.
handle
,
tstrerror
(
rpcMsg
->
code
));
if
(
rpcMsg
->
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"app:%p:%p, table:%s, failed to drop in dnode, vgId:%d sid:%d uid:%"
PRIu64
", reason:%s"
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
s
id
,
pTable
->
uid
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
t
id
,
pTable
->
uid
,
tstrerror
(
rpcMsg
->
code
));
dnodeSendRpcMnodeWriteRsp
(
mnodeMsg
,
rpcMsg
->
code
);
return
;
...
...
@@ -2384,7 +2387,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
// If the table is deleted by another thread during creation, stop creating and send drop msg to vnode
if
(
sdbCheckRowDeleted
(
tsChildTableSdb
,
pTable
))
{
mDebug
(
"app:%p:%p, table:%s, create table rsp received, but a deleting opertion incoming, vgId:%d sid:%d uid:%"
PRIu64
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
s
id
,
pTable
->
uid
);
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
t
id
,
pTable
->
uid
);
// if the vgroup is already dropped from hash, it can't be accquired by pTable->vgId
// so the refCount of vgroup can not be decreased
...
...
@@ -2419,13 +2422,13 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
if
(
mnodeMsg
->
retry
++
<
10
)
{
mDebug
(
"app:%p:%p, table:%s, create table rsp received, need retry, times:%d vgId:%d sid:%d uid:%"
PRIu64
" result:%s thandle:%p"
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
mnodeMsg
->
retry
,
pTable
->
vgId
,
pTable
->
s
id
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
mnodeMsg
->
retry
,
pTable
->
vgId
,
pTable
->
t
id
,
pTable
->
uid
,
tstrerror
(
rpcMsg
->
code
),
mnodeMsg
->
rpcMsg
.
handle
);
dnodeDelayReprocessMnodeWriteMsg
(
mnodeMsg
);
}
else
{
mError
(
"app:%p:%p, table:%s, failed to create in dnode, vgId:%d sid:%d uid:%"
PRIu64
", result:%s thandle:%p"
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
s
id
,
pTable
->
uid
,
mnodeMsg
->
rpcMsg
.
ahandle
,
mnodeMsg
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
t
id
,
pTable
->
uid
,
tstrerror
(
rpcMsg
->
code
),
mnodeMsg
->
rpcMsg
.
handle
);
SSdbOper
oper
=
{.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsChildTableSdb
,
.
pObj
=
pTable
};
...
...
@@ -2680,7 +2683,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
// tid
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int32_t
*
)
pWrite
=
pTable
->
s
id
;
*
(
int32_t
*
)
pWrite
=
pTable
->
t
id
;
cols
++
;
//vgid
...
...
src/mnode/src/mnodeVgroup.c
浏览文件 @
57417c7f
...
...
@@ -793,12 +793,12 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
void
mnodeAddTableIntoVgroup
(
SVgObj
*
pVgroup
,
SChildTableObj
*
pTable
)
{
int32_t
idPoolSize
=
taosIdPoolMaxSize
(
pVgroup
->
idPool
);
if
(
pTable
->
s
id
>
idPoolSize
)
{
if
(
pTable
->
t
id
>
idPoolSize
)
{
mnodeAllocVgroupIdPool
(
pVgroup
);
}
if
(
pTable
->
s
id
>=
1
)
{
taosIdPoolMarkStatus
(
pVgroup
->
idPool
,
pTable
->
s
id
);
if
(
pTable
->
t
id
>=
1
)
{
taosIdPoolMarkStatus
(
pVgroup
->
idPool
,
pTable
->
t
id
);
pVgroup
->
numOfTables
++
;
// The create vgroup message may be received later than the create table message
// and the writing order in sdb is therefore uncertain
...
...
@@ -808,8 +808,8 @@ void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) {
}
void
mnodeRemoveTableFromVgroup
(
SVgObj
*
pVgroup
,
SChildTableObj
*
pTable
)
{
if
(
pTable
->
s
id
>=
1
)
{
taosFreeId
(
pVgroup
->
idPool
,
pTable
->
s
id
);
if
(
pTable
->
t
id
>=
1
)
{
taosFreeId
(
pVgroup
->
idPool
,
pTable
->
t
id
);
pVgroup
->
numOfTables
--
;
// The create vgroup message may be received later than the create table message
// and the writing order in sdb is therefore uncertain
...
...
src/query/inc/qExecutor.h
浏览文件 @
57417c7f
...
...
@@ -213,6 +213,7 @@ typedef struct SQInfo {
void
*
pBuf
;
// allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
pthread_mutex_t
lock
;
// used to synchronize the rsp/query threads
tsem_t
ready
;
int32_t
dataReady
;
// denote if query result is ready or not
void
*
rspContext
;
// response context
}
SQInfo
;
...
...
src/query/inc/qSqlparser.h
浏览文件 @
57417c7f
...
...
@@ -276,8 +276,6 @@ tSQLExpr *tSQLExprNodeClone(tSQLExpr *pExpr);
SAlterTableSQL
*
tAlterTableSQLElems
(
SStrToken
*
pMeterName
,
tFieldList
*
pCols
,
tVariantList
*
pVals
,
int32_t
type
);
tSQLExprListList
*
tSQLListListAppend
(
tSQLExprListList
*
pList
,
tSQLExprList
*
pExprList
);
void
destroyAllSelectClause
(
SSubclauseInfo
*
pSql
);
void
doDestroyQuerySql
(
SQuerySQL
*
pSql
);
...
...
src/query/src/qAst.c
浏览文件 @
57417c7f
...
...
@@ -646,9 +646,7 @@ static bool filterItem(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *p
}
// handle the leaf node
assert
(
pLeft
->
nodeType
==
TSQL_NODE_COL
&&
pRight
->
nodeType
==
TSQL_NODE_VALUE
);
param
->
setupInfoFn
(
pExpr
,
param
->
pExtInfo
);
return
param
->
nodeFilterFn
(
pItem
,
pExpr
->
_node
.
info
);
}
...
...
@@ -769,6 +767,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
assert
(
taosArrayGetSize
(
result
)
==
0
);
tSQLBinaryTraverseOnSkipList
(
pExpr
,
result
,
pSkipList
,
param
);
}
return
;
}
...
...
src/query/src/qExecutor.c
浏览文件 @
57417c7f
...
...
@@ -6227,7 +6227,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info
pQInfo
->
arrTableIdInfo
=
taosArrayInit
(
tableIndex
,
sizeof
(
STableIdInfo
));
pQInfo
->
dataReady
=
QUERY_RESULT_NOT_READY
;
pQInfo
->
rspContext
=
NULL
;
pthread_mutex_init
(
&
pQInfo
->
lock
,
NULL
);
tsem_init
(
&
pQInfo
->
ready
,
0
,
0
);
pQuery
->
pos
=
-
1
;
pQuery
->
window
=
pQueryMsg
->
window
;
...
...
@@ -6692,12 +6694,14 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
pQInfo
->
dataReady
=
QUERY_RESULT_READY
;
buildRes
=
(
pQInfo
->
rspContext
!=
NULL
);
pthread_mutex_unlock
(
&
pQInfo
->
lock
);
// clear qhandle owner
// clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is
// put into task to be executed.
assert
(
pQInfo
->
owner
==
taosGetPthreadId
());
pQInfo
->
owner
=
0
;
pthread_mutex_unlock
(
&
pQInfo
->
lock
);
tsem_post
(
&
pQInfo
->
ready
);
return
buildRes
;
}
...
...
@@ -6761,18 +6765,24 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
))
{
qError
(
"QInfo:%p invalid qhandle"
,
pQInfo
);
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
*
buildRes
=
false
;
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
if
(
IS_QUERY_KILLED
(
pQInfo
))
{
qDebug
(
"QInfo:%p query is killed, code:%d"
,
pQInfo
,
pQInfo
->
code
);
return
pQInfo
->
code
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
#if 0
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
pthread_mutex_lock(&pQInfo->lock);
assert(pQInfo->rspContext == NULL);
if (pQInfo->dataReady == QUERY_RESULT_READY) {
*buildRes = true;
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%"PRId64", code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows,
...
...
@@ -6781,10 +6791,17 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
*buildRes = false;
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
pQInfo->rspContext = pRspContext;
assert(pQInfo->rspContext != NULL);
}
code = pQInfo->code;
pthread_mutex_unlock(&pQInfo->lock);
#else
tsem_wait
(
&
pQInfo
->
ready
);
*
buildRes
=
true
;
code
=
pQInfo
->
code
;
#endif
return
code
;
}
...
...
src/query/src/qParserImpl.c
浏览文件 @
57417c7f
...
...
@@ -130,13 +130,15 @@ tSQLExpr *tSQLExprIdValueCreate(SStrToken *pToken, int32_t optrType) {
tVariantCreate
(
&
pSQLExpr
->
val
,
pToken
);
pSQLExpr
->
nSQLOptr
=
optrType
;
}
else
if
(
optrType
==
TK_NOW
)
{
//
default use microsecond
//
use microsecond by default
pSQLExpr
->
val
.
i64Key
=
taosGetTimestamp
(
TSDB_TIME_PRECISION_MICRO
);
pSQLExpr
->
val
.
nType
=
TSDB_DATA_TYPE_BIGINT
;
pSQLExpr
->
nSQLOptr
=
TK_TIMESTAMP
;
// TK_TIMESTAMP used to denote the time value is in microsecond
}
else
if
(
optrType
==
TK_VARIABLE
)
{
int32_t
ret
=
parseAbsoluteDuration
(
pToken
->
z
,
pToken
->
n
,
&
pSQLExpr
->
val
.
i64Key
);
UNUSED
(
ret
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TSC_SQL_SYNTAX_ERROR
;
}
pSQLExpr
->
val
.
nType
=
TSDB_DATA_TYPE_BIGINT
;
pSQLExpr
->
nSQLOptr
=
TK_TIMESTAMP
;
...
...
@@ -148,6 +150,7 @@ tSQLExpr *tSQLExprIdValueCreate(SStrToken *pToken, int32_t optrType) {
pSQLExpr
->
nSQLOptr
=
optrType
;
}
return
pSQLExpr
;
}
...
...
@@ -532,26 +535,6 @@ SQuerySQL *tSetQuerySQLElems(SStrToken *pSelectToken, tSQLExprList *pSelection,
return
pQuery
;
}
tSQLExprListList
*
tSQLListListAppend
(
tSQLExprListList
*
pList
,
tSQLExprList
*
pExprList
)
{
if
(
pList
==
NULL
)
pList
=
calloc
(
1
,
sizeof
(
tSQLExprListList
));
if
(
pList
->
nAlloc
<=
pList
->
nList
)
{
//
pList
->
nAlloc
=
(
pList
->
nAlloc
<<
1
)
+
4
;
pList
->
a
=
realloc
(
pList
->
a
,
pList
->
nAlloc
*
sizeof
(
pList
->
a
[
0
]));
if
(
pList
->
a
==
0
)
{
pList
->
nList
=
pList
->
nAlloc
=
0
;
return
pList
;
}
}
assert
(
pList
->
a
!=
0
);
if
(
pExprList
)
{
pList
->
a
[
pList
->
nList
++
]
=
pExprList
;
}
return
pList
;
}
void
doDestroyQuerySql
(
SQuerySQL
*
pQuerySql
)
{
if
(
pQuerySql
==
NULL
)
{
return
;
...
...
src/query/src/qPercentile.c
浏览文件 @
57417c7f
...
...
@@ -365,7 +365,7 @@ void tMemBucketDestroy(tMemBucket *pBucket) {
taosTFree
(
pBucket
);
}
void
tMemBucketUpdateBoundingBox
(
MinMaxEntry
*
r
,
char
*
data
,
int32_t
dataType
)
{
void
tMemBucketUpdateBoundingBox
(
MinMaxEntry
*
r
,
c
onst
c
har
*
data
,
int32_t
dataType
)
{
switch
(
dataType
)
{
case
TSDB_DATA_TYPE_INT
:
{
int32_t
val
=
*
(
int32_t
*
)
data
;
...
...
src/query/src/qSyntaxtreefunction.c
浏览文件 @
57417c7f
...
...
@@ -1247,7 +1247,10 @@ _bi_consumer_fn_t tGetBiConsumerFn(int32_t leftType, int32_t rightType, int32_t
case
TSDB_BINARY_OP_REMAINDER
:
return
rem_function_arraylist
[
leftType
][
rightType
];
default:
assert
(
0
);
return
NULL
;
}
assert
(
0
);
return
NULL
;
}
src/rpc/src/rpcMain.c
浏览文件 @
57417c7f
...
...
@@ -412,7 +412,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
rpcLockConn
(
pConn
);
if
(
pConn
->
inType
==
0
||
pConn
->
user
[
0
]
==
0
)
{
t
Debug
(
"%s, connection is already released, rsp wont be sent"
,
pConn
->
info
);
t
Error
(
"%s, connection is already released, rsp wont be sent"
,
pConn
->
info
);
rpcUnlockConn
(
pConn
);
rpcFreeCont
(
pMsg
->
pCont
);
rpcDecRef
(
pRpc
);
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
57417c7f
...
...
@@ -239,7 +239,7 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
return
NULL
;
}
if
(
tsdbInitTableCfg
(
pCfg
,
pMsg
->
tableType
,
htobe64
(
pMsg
->
uid
),
htonl
(
pMsg
->
s
id
))
<
0
)
goto
_err
;
if
(
tsdbInitTableCfg
(
pCfg
,
pMsg
->
tableType
,
htobe64
(
pMsg
->
uid
),
htonl
(
pMsg
->
t
id
))
<
0
)
goto
_err
;
if
(
tdInitTSchemaBuilder
(
&
schemaBuilder
,
htonl
(
pMsg
->
sversion
))
<
0
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
57417c7f
...
...
@@ -14,7 +14,8 @@
*/
#define _DEFAULT_SOURCE
//#include <dnode.h>
#define _NON_BLOCKING_RETRIEVE 0
#include "os.h"
#include "tglobal.h"
...
...
@@ -77,8 +78,7 @@ int32_t vnodeCheckRead(void *param) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
vnodePutItemIntoReadQueue
(
SVnodeObj
*
pVnode
,
void
**
qhandle
)
{
static
int32_t
vnodePutItemIntoReadQueue
(
SVnodeObj
*
pVnode
,
void
**
qhandle
,
void
*
ahandle
)
{
int32_t
code
=
vnodeCheckRead
(
pVnode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
return
code
;
...
...
@@ -86,7 +86,7 @@ static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) {
pRead
->
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_QUERY
;
pRead
->
pCont
=
qhandle
;
pRead
->
contLen
=
0
;
pRead
->
rpcMsg
.
handle
=
NULL
;
pRead
->
rpcMsg
.
ahandle
=
ahandle
;
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
...
...
@@ -96,14 +96,23 @@ static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
vnodeDumpQueryResult
(
SRspRet
*
pRet
,
void
*
pVnode
,
void
**
handle
,
bool
*
freeHandle
)
{
/**
*
* @param pRet response message object
* @param pVnode the vnode object
* @param handle qhandle for executing query
* @param freeHandle free qhandle or not
* @param ahandle sqlObj address at client side
* @return
*/
static
int32_t
vnodeDumpQueryResult
(
SRspRet
*
pRet
,
void
*
pVnode
,
void
**
handle
,
bool
*
freeHandle
,
void
*
ahandle
)
{
bool
continueExec
=
false
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
qDumpRetrieveResult
(
*
handle
,
(
SRetrieveTableRsp
**
)
&
pRet
->
rsp
,
&
pRet
->
len
,
&
continueExec
))
==
TSDB_CODE_SUCCESS
)
{
if
(
continueExec
)
{
*
freeHandle
=
false
;
code
=
vnodePutItemIntoReadQueue
(
pVnode
,
handle
);
code
=
vnodePutItemIntoReadQueue
(
pVnode
,
handle
,
ahandle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
*
freeHandle
=
true
;
return
code
;
...
...
@@ -186,7 +195,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// current connect is broken
if
(
code
==
TSDB_CODE_SUCCESS
)
{
handle
=
qRegisterQInfo
(
pVnode
->
qMgmt
,
(
uint64_t
)
pQInfo
);
if
(
handle
==
NULL
)
{
// failed to register qhandle
, todo add error test case
if
(
handle
==
NULL
)
{
// failed to register qhandle
pRsp
->
code
=
terrno
;
terrno
=
0
;
vError
(
"vgId:%d QInfo:%p register qhandle failed, return to app, code:%s"
,
pVnode
->
vgId
,
(
void
*
)
pQInfo
,
...
...
@@ -212,7 +221,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if
(
handle
!=
NULL
)
{
vDebug
(
"vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app"
,
vgId
,
*
handle
);
code
=
vnodePutItemIntoReadQueue
(
pVnode
,
handle
);
code
=
vnodePutItemIntoReadQueue
(
pVnode
,
handle
,
pReadMsg
->
rpcMsg
.
ahandle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pRsp
->
code
=
code
;
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
...
...
@@ -225,6 +234,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vDebug
(
"vgId:%d, QInfo:%p, dnode continues to exec query"
,
pVnode
->
vgId
,
*
qhandle
);
#if _NON_BLOCKING_RETRIEVE
bool
freehandle
=
false
;
bool
buildRes
=
qTableQuery
(
*
qhandle
);
// do execute query
...
...
@@ -238,11 +249,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pReadMsg
->
rpcMsg
.
handle
);
// set the real rsp error code
pReadMsg
->
rpcMsg
.
code
=
vnodeDumpQueryResult
(
&
pReadMsg
->
rspRet
,
pVnode
,
qhandle
,
&
freehandle
);
pReadMsg
->
rpcMsg
.
code
=
vnodeDumpQueryResult
(
&
pReadMsg
->
rspRet
,
pVnode
,
qhandle
,
&
freehandle
,
pReadMsg
->
rpcMsg
.
ahandle
);
// NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
code
=
TSDB_CODE_QRY_HAS_RSP
;
}
else
{
void
*
h1
=
qGetResultRetrieveMsg
(
*
qhandle
);
assert
(
h1
==
NULL
);
freehandle
=
qQueryCompleted
(
*
qhandle
);
}
...
...
@@ -251,6 +265,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if
(
freehandle
||
(
!
buildRes
))
{
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
qhandle
,
freehandle
);
}
#else
qTableQuery
(
*
qhandle
);
// do execute query
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
qhandle
,
false
);
#endif
}
return
code
;
...
...
@@ -310,12 +328,18 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
memset
(
pRet
->
rsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
freeHandle
=
true
;
}
else
{
// result is not ready, return immediately
assert
(
buildRes
==
true
);
#if _NON_BLOCKING_RETRIEVE
if
(
!
buildRes
)
{
assert
(
pReadMsg
->
rpcMsg
.
handle
!=
NULL
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
false
);
return
TSDB_CODE_QRY_NOT_READY
;
}
#endif
code
=
vnodeDumpQueryResult
(
pRet
,
pVnode
,
handle
,
&
freeHandle
);
// ahandle is the sqlObj pointer
code
=
vnodeDumpQueryResult
(
pRet
,
pVnode
,
handle
,
&
freeHandle
,
pReadMsg
->
rpcMsg
.
ahandle
);
}
// If qhandle is not added into vread queue, the query should be completed already or paused with error.
...
...
src/vnode/src/vnodeWrite.c
浏览文件 @
57417c7f
...
...
@@ -164,7 +164,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
int32_t
code
=
TSDB_CODE_SUCCESS
;
vDebug
(
"vgId:%d, table:%s, start to drop"
,
pVnode
->
vgId
,
pTable
->
tableId
);
STableId
tableId
=
{.
uid
=
htobe64
(
pTable
->
uid
),
.
tid
=
htonl
(
pTable
->
s
id
)};
STableId
tableId
=
{.
uid
=
htobe64
(
pTable
->
uid
),
.
tid
=
htonl
(
pTable
->
t
id
)};
if
(
tsdbDropTable
(
pVnode
->
tsdb
,
tableId
)
<
0
)
code
=
terrno
;
...
...
tests/script/general/parser/col_arithmetic_operation.sim
浏览文件 @
57417c7f
...
...
@@ -91,4 +91,60 @@ endi
sql_error select max(c2*2) from $tb
sql_error select max(c1-c2) from $tb
print =====================> td-1764
sql select sum(c1)/count(*), sum(c1) as b, count(*) as b from $stb interval(1y)
if $rows != 1 then
return -1
endi
if $data00 != @18-01-01 00:00:00.000@ then
return -1
endi
if $data01 != 2.250000000 then
return -1
endi
if $data02 != 225000 then
return -1
endi
sql select first(c1) - last(c1), first(c1) as b, last(c1) as b, min(c1) - max(c1), spread(c1) from ca_stb0 interval(1y)
if $rows != 1 then
return -1
endi
if $data00 != @18-01-01 00:00:00.000@ then
return -1
endi
if $data01 != -9.000000000 then
return -1
endi
if $data02 != 0 then
return -1
endi
if $data03 != 9 then
return -1
endi
if $data04 != -9.000000000 then
return -1
endi
if $data05 != 9.000000000 then
return -1
endi
sql_error select first(c1, c2) - last(c1, c2) from stb interval(1y)
sql_error select first(ts) - last(ts) from stb interval(1y)
sql_error select top(c1, 2) - last(c1) from stb;
sql_error select stddev(c1) - last(c1) from stb;
sql_error select diff(c1) - last(c1) from stb;
sql_error select first(c7) - last(c7) from stb;
sql_error select first(c8) - last(c8) from stb;
sql_error select first(c9) - last(c9) from stb;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
tests/script/general/parser/constCol.sim
浏览文件 @
57417c7f
...
...
@@ -353,4 +353,36 @@ sql_error select from t1
sql_error select abc from t1
sql_error select abc as tu from t1
print ========================> td-1756
sql_error select * from t1 where ts>now-1y
sql_error select * from t1 where ts>now-1n
print ========================> td-1752
sql select * from db.st2 where t2 < 200 and t2 is not null;
if $rows != 1 then
return -1
endi
if $data00 != @19-12-09 16:27:35.000@ then
return -1
endi
if $data01 != 2 then
return -1
endi
if $data02 != 1 then
return -1
endi
sql select * from db.st2 where t2 > 200 or t2 is null;
if $rows != 0 then
return -1
endi
sql select * from st2 where t2 < 200 and t2 is null;
if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/script/general/parser/tags_dynamically_specifiy.sim
浏览文件 @
57417c7f
...
...
@@ -96,5 +96,10 @@ if $rows != 14 then
return -1
endi
print ===============================> td-1765
sql create table m1(ts timestamp, k int) tags(a binary(4), b nchar(4));
sql create table tm0 using m1 tags('abcd', 'abcd');
sql_error alter table tm0 set tag b = 'abcd1';
sql_error alter table tm0 set tag a = 'abcd1';
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
tests/script/general/parser/union.sim
浏览文件 @
57417c7f
...
...
@@ -251,7 +251,7 @@ if $rows != 15 then
endi
# first subclause are empty
sql select count(*) as c from union_tb0 where ts
>now+10y
union all select sum(c1) as c from union_tb1;
sql select count(*) as c from union_tb0 where ts
> now + 3650d
union all select sum(c1) as c from union_tb1;
if $rows != 1 then
return -1
endi
...
...
@@ -346,7 +346,7 @@ if $data91 != 99 then
return -1
endi
#
1111111111111111111111111111111111111111111111111
#
=================================================================================================
# two aggregated functions for normal tables
sql select sum(c1) as a from union_tb0 limit 1 union all select sum(c3) as a from union_tb1 limit 2;
if $rows != 2 then
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录