Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
febd427b
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
febd427b
编写于
6月 22, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into feature/2.0tsdb
上级
203238f7
99cb63d1
变更
40
展开全部
隐藏空白更改
内联
并排
Showing
40 changed file
with
788 addition
and
797 deletion
+788
-797
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+4
-10
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+6
-7
src/client/src/TSDBJNIConnector.c
src/client/src/TSDBJNIConnector.c
+7
-2
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+1
-1
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+18
-19
src/client/src/tscPrepare.c
src/client/src/tscPrepare.c
+13
-9
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+27
-8
src/client/src/tscServer.c
src/client/src/tscServer.c
+45
-186
src/client/src/tscSql.c
src/client/src/tscSql.c
+1
-2
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+23
-21
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+33
-61
src/inc/taosdef.h
src/inc/taosdef.h
+1
-1
src/inc/tsdb.h
src/inc/tsdb.h
+19
-40
src/kit/shell/src/shellEngine.c
src/kit/shell/src/shellEngine.c
+5
-2
src/kit/shell/src/shellImport.c
src/kit/shell/src/shellImport.c
+4
-1
src/kit/shell/src/shellLinux.c
src/kit/shell/src/shellLinux.c
+10
-10
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+22
-16
src/kit/taosdump/taosdump.c
src/kit/taosdump/taosdump.c
+30
-21
src/mnode/src/mnodeAcct.c
src/mnode/src/mnodeAcct.c
+1
-1
src/mnode/src/mnodeVgroup.c
src/mnode/src/mnodeVgroup.c
+3
-2
src/os/linux/src/linuxPlatform.c
src/os/linux/src/linuxPlatform.c
+0
-77
src/os/linux/src/linuxSysPara.c
src/os/linux/src/linuxSysPara.c
+11
-5
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+7
-7
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+149
-153
src/query/src/qUtil.c
src/query/src/qUtil.c
+0
-2
src/query/src/qresultBuf.c
src/query/src/qresultBuf.c
+2
-2
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+10
-11
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+105
-83
src/util/inc/tutil.h
src/util/inc/tutil.h
+2
-0
src/util/src/tlog.c
src/util/src/tlog.c
+1
-1
src/util/src/tnote.c
src/util/src/tnote.c
+3
-1
src/util/src/ttimer.c
src/util/src/ttimer.c
+1
-4
src/util/src/tutil.c
src/util/src/tutil.c
+19
-0
tests/pytest/stream/stream1.py
tests/pytest/stream/stream1.py
+25
-12
tests/pytest/stream/stream2.py
tests/pytest/stream/stream2.py
+44
-17
tests/pytest/table/boundary.py
tests/pytest/table/boundary.py
+1
-1
tests/script/general/parser/commit.sim
tests/script/general/parser/commit.sim
+1
-0
tests/script/general/parser/selectResNum.sim
tests/script/general/parser/selectResNum.sim
+1
-0
tests/script/general/stable/refcount.sim
tests/script/general/stable/refcount.sim
+132
-0
tests/script/unique/cluster/balance2.sim
tests/script/unique/cluster/balance2.sim
+1
-1
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
febd427b
...
...
@@ -89,27 +89,21 @@ typedef struct SVgroupTableInfo {
int32_t
tscCreateDataBlock
(
size_t
initialSize
,
int32_t
rowSize
,
int32_t
startOffset
,
const
char
*
name
,
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
);
void
tscAppendDataBlock
(
SDataBlockList
*
pList
,
STableDataBlocks
*
pBlocks
);
void
tscDestroyDataBlock
(
STableDataBlocks
*
pDataBlock
);
void
tscSortRemoveDataBlockDupRows
(
STableDataBlocks
*
dataBuf
);
SParamInfo
*
tscAddParamToDataBlock
(
STableDataBlocks
*
pDataBlock
,
char
type
,
uint8_t
timePrec
,
short
bytes
,
uint32_t
offset
);
SDataBlockList
*
tscCreateBlockArrayList
();
void
*
tscDestroyBlockArrayList
(
SDataBlockList
*
pList
);
void
*
tscDestroyBlockArrayList
(
SArray
*
pDataBlockList
);
int32_t
tscCopyDataBlockToPayload
(
SSqlObj
*
pSql
,
STableDataBlocks
*
pDataBlock
);
void
tscFreeUnusedDataBlocks
(
S
DataBlockList
*
p
List
);
int32_t
tscMergeTableDataBlocks
(
SSqlObj
*
pSql
,
S
DataBlockList
*
pDataList
);
int32_t
tscGetDataBlockFromList
(
void
*
pHashList
,
S
DataBlockList
*
pDataBlockList
,
int64_t
id
,
int32_t
size
,
void
tscFreeUnusedDataBlocks
(
S
Array
*
pDataBlock
List
);
int32_t
tscMergeTableDataBlocks
(
SSqlObj
*
pSql
,
S
Array
*
pDataList
);
int32_t
tscGetDataBlockFromList
(
void
*
pHashList
,
S
Array
*
pDataBlockList
,
int64_t
id
,
int32_t
size
,
int32_t
startOffset
,
int32_t
rowSize
,
const
char
*
tableId
,
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
);
//UNUSED_FUNC STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
/**
*
* for the projection query on metric or point interpolation query on metric,
* we iterate all the meters, instead of invoke query on all qualified meters simultaneously.
*
...
...
src/client/inc/tsclient.h
浏览文件 @
febd427b
...
...
@@ -183,11 +183,11 @@ typedef struct STableDataBlocks {
SParamInfo
*
params
;
}
STableDataBlocks
;
typedef
struct
SDataBlockList
{
// todo remove
uint32_t
nSize
;
uint32_t
nAlloc
;
STableDataBlocks
**
pData
;
}
SDataBlockList
;
//
typedef struct SDataBlockList { // todo remove
//
uint32_t nSize;
//
uint32_t nAlloc;
//
STableDataBlocks **pData;
//
} SDataBlockList;
typedef
struct
SQueryInfo
{
int16_t
command
;
// the command may be different for each subclause, so keep it seperately.
...
...
@@ -238,8 +238,7 @@ typedef struct {
void
*
pTableList
;
// referred table involved in sql
int32_t
batchSize
;
// for parameter ('?') binding and batch processing
int32_t
numOfParams
;
SDataBlockList
*
pDataBlocks
;
// submit data blocks after parsing sql
SArray
*
pDataBlocks
;
// SArray<STableDataBlocks*> submit data blocks after parsing sql
}
SSqlCmd
;
typedef
struct
SResRec
{
...
...
src/client/src/TSDBJNIConnector.c
浏览文件 @
febd427b
...
...
@@ -565,6 +565,11 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNI
sql
=
(
char
*
)(
*
env
)
->
GetStringUTFChars
(
env
,
jsql
,
NULL
);
}
if
(
topic
==
NULL
||
sql
==
NULL
)
{
jniTrace
(
"jobj:%p, invalid argument: topic or sql is NULL"
,
jobj
);
return
sub
;
}
TAOS_SUB
*
tsub
=
taos_subscribe
(
taos
,
(
int
)
restart
,
topic
,
sql
,
NULL
,
NULL
,
jinterval
);
sub
=
(
jlong
)
tsub
;
...
...
@@ -574,8 +579,8 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNI
jniTrace
(
"jobj:%p, successfully subscribe: topic: %s"
,
jobj
,
topic
);
}
if
(
topic
!=
NULL
)
(
*
env
)
->
ReleaseStringUTFChars
(
env
,
jtopic
,
topic
);
if
(
sql
!=
NULL
)
(
*
env
)
->
ReleaseStringUTFChars
(
env
,
jsql
,
sql
);
(
*
env
)
->
ReleaseStringUTFChars
(
env
,
jtopic
,
topic
);
(
*
env
)
->
ReleaseStringUTFChars
(
env
,
jsql
,
sql
);
return
sub
;
}
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
febd427b
...
...
@@ -123,7 +123,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
}
}
if
(
n
==
0
)
{
if
(
n
==
0
||
pCtx
==
NULL
)
{
free
(
pTagCtx
);
}
else
{
pCtx
->
tagInfo
.
pTagCtxList
=
pTagCtx
;
...
...
src/client/src/tscParseInsert.c
浏览文件 @
febd427b
...
...
@@ -989,13 +989,11 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
}
int
validateTableName
(
char
*
tblName
,
int
len
,
SSQLToken
*
psTblToken
)
{
char
buf
[
TSDB_TABLE_ID_LEN
]
=
{
0
};
tstrncpy
(
buf
,
tblName
,
sizeof
(
buf
));
tstrncpy
(
psTblToken
->
z
,
tblName
,
TSDB_TABLE_ID_LEN
);
psTblToken
->
n
=
len
;
psTblToken
->
type
=
TK_ID
;
psTblToken
->
z
=
buf
;
tSQLGetToken
(
buf
,
&
psTblToken
->
type
);
tSQLGetToken
(
psTblToken
->
z
,
&
psTblToken
->
type
);
return
tscValidateName
(
psTblToken
);
}
...
...
@@ -1042,8 +1040,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
if
(
NULL
==
pCmd
->
pTableList
)
{
pCmd
->
pTableList
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
);
pSql
->
cmd
.
pDataBlocks
=
tscCreateBlockArrayList
();
pCmd
->
pDataBlocks
=
taosArrayInit
(
4
,
POINTER_BYTES
);
if
(
NULL
==
pCmd
->
pTableList
||
NULL
==
pSql
->
cmd
.
pDataBlocks
)
{
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
...
...
@@ -1081,7 +1078,9 @@ int tsParseInsertSql(SSqlObj *pSql) {
}
pCmd
->
curSql
=
sToken
.
z
;
char
buf
[
TSDB_TABLE_ID_LEN
];
SSQLToken
sTblToken
;
sTblToken
.
z
=
buf
;
// Check if the table name available or not
if
(
validateTableName
(
sToken
.
z
,
sToken
.
n
,
&
sTblToken
)
!=
TSDB_CODE_SUCCESS
)
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"table name invalid"
,
sToken
.
z
);
...
...
@@ -1174,7 +1173,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto
_error
;
}
t
scAppendDataBlock
(
pCmd
->
pDataBlocks
,
pDataBlock
);
t
aosArrayPush
(
pCmd
->
pDataBlocks
,
&
pDataBlock
);
strcpy
(
pDataBlock
->
filename
,
fname
);
}
else
if
(
sToken
.
type
==
TK_LP
)
{
/* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
...
...
@@ -1262,7 +1261,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto
_clean
;
}
if
(
pCmd
->
pDataBlocks
->
nSize
>
0
)
{
// merge according to vgId
if
(
taosArrayGetSize
(
pCmd
->
pDataBlocks
)
>
0
)
{
// merge according to vgId
if
((
code
=
tscMergeTableDataBlocks
(
pSql
,
pCmd
->
pDataBlocks
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -1372,8 +1371,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return
code
;
}
// the pDataBlock is different from the pTableDataBlocks
STableDataBlocks
*
pDataBlock
=
pCmd
->
pDataBlocks
->
pData
[
0
];
STableDataBlocks
*
pDataBlock
=
taosArrayGetP
(
pCmd
->
pDataBlocks
,
0
);
if
((
code
=
tscCopyDataBlockToPayload
(
pSql
,
pDataBlock
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -1404,15 +1402,15 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
int32_t
rowSize
=
tinfo
.
rowSize
;
pCmd
->
pDataBlocks
=
t
scCreateBlockArrayList
(
);
pCmd
->
pDataBlocks
=
t
aosArrayInit
(
4
,
POINTER_BYTES
);
STableDataBlocks
*
pTableDataBlock
=
NULL
;
int32_t
ret
=
tscCreateDataBlock
(
TSDB_PAYLOAD_SIZE
,
rowSize
,
sizeof
(
SSubmitBlk
),
pTableMetaInfo
->
name
,
pTableMeta
,
&
pTableDataBlock
);
int32_t
ret
=
tscCreateDataBlock
(
TSDB_PAYLOAD_SIZE
,
rowSize
,
sizeof
(
SSubmitBlk
),
pTableMetaInfo
->
name
,
pTableMeta
,
&
pTableDataBlock
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
-
1
;
return
ret
;
}
t
scAppendDataBlock
(
pCmd
->
pDataBlocks
,
pTableDataBlock
);
t
aosArrayPush
(
pCmd
->
pDataBlocks
,
&
pTableDataBlock
);
code
=
tscAllocateMemIfNeed
(
pTableDataBlock
,
rowSize
,
&
maxRows
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
return
-
1
;
...
...
@@ -1446,7 +1444,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
return
-
code
;
}
pTableDataBlock
=
pCmd
->
pDataBlocks
->
pData
[
0
]
;
pTableDataBlock
=
taosArrayGetP
(
pCmd
->
pDataBlocks
,
0
)
;
pTableDataBlock
->
size
=
sizeof
(
SSubmitBlk
);
pTableDataBlock
->
rowSize
=
tinfo
.
rowSize
;
...
...
@@ -1483,13 +1481,14 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
int32_t
affected_rows
=
0
;
assert
(
pCmd
->
dataSourceType
==
DATA_FROM_DATA_FILE
&&
pCmd
->
pDataBlocks
!=
NULL
);
S
DataBlockList
*
pDataBlockList
=
pCmd
->
pDataBlocks
;
S
Array
*
pDataBlockList
=
pCmd
->
pDataBlocks
;
pCmd
->
pDataBlocks
=
NULL
;
char
path
[
PATH_MAX
]
=
{
0
};
for
(
int32_t
i
=
0
;
i
<
pDataBlockList
->
nSize
;
++
i
)
{
pDataBlock
=
pDataBlockList
->
pData
[
i
];
size_t
size
=
taosArrayGetSize
(
pDataBlockList
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
pDataBlock
=
taosArrayGetP
(
pDataBlockList
,
i
);
if
(
pDataBlock
==
NULL
)
{
continue
;
}
...
...
src/client/src/tscPrepare.c
浏览文件 @
febd427b
...
...
@@ -331,8 +331,9 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
binded
=
pCmd
->
batchSize
/
2
;
}
for
(
int32_t
i
=
0
;
i
<
pCmd
->
pDataBlocks
->
nSize
;
++
i
)
{
STableDataBlocks
*
pBlock
=
pCmd
->
pDataBlocks
->
pData
[
i
];
size_t
size
=
taosArrayGetSize
(
pCmd
->
pDataBlocks
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableDataBlocks
*
pBlock
=
taosArrayGetP
(
pCmd
->
pDataBlocks
,
i
);
uint32_t
totalDataSize
=
pBlock
->
size
-
sizeof
(
SSubmitBlk
);
uint32_t
dataSize
=
totalDataSize
/
alloced
;
assert
(
dataSize
*
alloced
==
totalDataSize
);
...
...
@@ -370,8 +371,9 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
return
TSDB_CODE_SUCCESS
;
}
for
(
int32_t
i
=
0
;
i
<
pCmd
->
pDataBlocks
->
nSize
;
++
i
)
{
STableDataBlocks
*
pBlock
=
pCmd
->
pDataBlocks
->
pData
[
i
];
size_t
total
=
taosArrayGetSize
(
pCmd
->
pDataBlocks
);
for
(
int32_t
i
=
0
;
i
<
total
;
++
i
)
{
STableDataBlocks
*
pBlock
=
taosArrayGetP
(
pCmd
->
pDataBlocks
,
i
);
uint32_t
totalDataSize
=
pBlock
->
size
-
sizeof
(
SSubmitBlk
);
pBlock
->
size
+=
totalDataSize
/
alloced
;
...
...
@@ -395,8 +397,10 @@ static int insertStmtReset(STscStmt* pStmt) {
SSqlCmd
*
pCmd
=
&
pStmt
->
pSql
->
cmd
;
if
(
pCmd
->
batchSize
>
2
)
{
int32_t
alloced
=
(
pCmd
->
batchSize
+
1
)
/
2
;
for
(
int32_t
i
=
0
;
i
<
pCmd
->
pDataBlocks
->
nSize
;
++
i
)
{
STableDataBlocks
*
pBlock
=
pCmd
->
pDataBlocks
->
pData
[
i
];
size_t
size
=
taosArrayGetSize
(
pCmd
->
pDataBlocks
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableDataBlocks
*
pBlock
=
taosArrayGetP
(
pCmd
->
pDataBlocks
,
i
);
uint32_t
totalDataSize
=
pBlock
->
size
-
sizeof
(
SSubmitBlk
);
pBlock
->
size
=
sizeof
(
SSubmitBlk
)
+
totalDataSize
/
alloced
;
...
...
@@ -423,15 +427,15 @@ static int insertStmtExecute(STscStmt* stmt) {
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
pCmd
->
clauseIndex
,
0
);
assert
(
pCmd
->
numOfClause
==
1
);
if
(
pCmd
->
pDataBlocks
->
nSize
>
0
)
{
if
(
taosArrayGetSize
(
pCmd
->
pDataBlocks
)
>
0
)
{
// merge according to vgid
int
code
=
tscMergeTableDataBlocks
(
stmt
->
pSql
,
pCmd
->
pDataBlocks
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
STableDataBlocks
*
pDataBlock
=
pCmd
->
pDataBlocks
->
pData
[
0
]
;
STableDataBlocks
*
pDataBlock
=
taosArrayGetP
(
pCmd
->
pDataBlocks
,
0
)
;
code
=
tscCopyDataBlockToPayload
(
stmt
->
pSql
,
pDataBlock
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
febd427b
...
...
@@ -1483,6 +1483,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
const
char
*
msg6
=
"function applied to tags not allowed"
;
const
char
*
msg7
=
"normal table can not apply this function"
;
const
char
*
msg8
=
"multi-columns selection does not support alias column name"
;
const
char
*
msg9
=
"invalid function"
;
switch
(
optr
)
{
case
TK_COUNT
:
{
...
...
@@ -1683,7 +1684,9 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
bool
requireAllFields
=
(
pItem
->
pNode
->
pParam
==
NULL
);
int16_t
functionID
=
0
;
changeFunctionID
(
optr
,
&
functionID
);
if
(
changeFunctionID
(
optr
,
&
functionID
)
!=
TSDB_CODE_SUCCESS
)
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg9
);
}
if
(
!
requireAllFields
)
{
if
(
pItem
->
pNode
->
pParam
->
nExpr
<
1
)
{
...
...
@@ -3183,10 +3186,22 @@ static bool isValidExpr(tSQLExpr* pLeft, tSQLExpr* pRight, int32_t optr) {
*
* However, columnA < 4+12 is valid
*/
if
((
pLeft
->
nSQLOptr
>=
TK_COUNT
&&
pLeft
->
nSQLOptr
<=
TK_AVG_IRATE
)
||
(
pRight
->
nSQLOptr
>=
TK_COUNT
&&
pRight
->
nSQLOptr
<=
TK_AVG_IRATE
)
||
(
pLeft
->
nSQLOptr
>=
TK_BOOL
&&
pLeft
->
nSQLOptr
<=
TK_BINARY
&&
pRight
->
nSQLOptr
>=
TK_BOOL
&&
pRight
->
nSQLOptr
<=
TK_BINARY
))
{
if
(
pLeft
->
nSQLOptr
>=
TK_COUNT
&&
pLeft
->
nSQLOptr
<=
TK_AVG_IRATE
)
{
return
false
;
}
if
(
pRight
==
NULL
)
{
return
true
;
}
if
(
pRight
->
nSQLOptr
>=
TK_COUNT
&&
pRight
->
nSQLOptr
<=
TK_AVG_IRATE
)
{
return
false
;
}
if
(
pLeft
->
nSQLOptr
>=
TK_BOOL
&&
pLeft
->
nSQLOptr
<=
TK_BINARY
&&
pRight
->
nSQLOptr
>=
TK_BOOL
&&
pRight
->
nSQLOptr
<=
TK_BINARY
)
{
return
false
;
}
...
...
@@ -3759,13 +3774,17 @@ static void doAddJoinTagsColumnsIntoTagList(SQueryInfo* pQueryInfo, SCondExpr* p
if
(
QUERY_IS_JOIN_QUERY
(
pQueryInfo
->
type
)
&&
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
SColumnIndex
index
=
{
0
};
getColumnIndexByName
(
&
pCondExpr
->
pJoinExpr
->
pLeft
->
colInfo
,
pQueryInfo
,
&
index
);
if
(
getColumnIndexByName
(
&
pCondExpr
->
pJoinExpr
->
pLeft
->
colInfo
,
pQueryInfo
,
&
index
)
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"%p: invalid column name (left)"
,
pQueryInfo
);
}
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
index
.
tableIndex
);
index
.
columnIndex
=
index
.
columnIndex
-
tscGetNumOfColumns
(
pTableMetaInfo
->
pTableMeta
);
tscColumnListInsert
(
pTableMetaInfo
->
tagColList
,
&
index
);
getColumnIndexByName
(
&
pCondExpr
->
pJoinExpr
->
pRight
->
colInfo
,
pQueryInfo
,
&
index
);
if
(
getColumnIndexByName
(
&
pCondExpr
->
pJoinExpr
->
pRight
->
colInfo
,
pQueryInfo
,
&
index
)
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"%p: invalid column name (right)"
,
pQueryInfo
);
}
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
index
.
tableIndex
);
index
.
columnIndex
=
index
.
columnIndex
-
tscGetNumOfColumns
(
pTableMetaInfo
->
pTableMeta
);
...
...
@@ -4463,7 +4482,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
SUpdateTableTagValMsg
*
pUpdateMsg
=
(
SUpdateTableTagValMsg
*
)
(
pCmd
->
payload
+
tsRpcHeadSize
)
;
SUpdateTableTagValMsg
*
pUpdateMsg
=
(
SUpdateTableTagValMsg
*
)
pCmd
->
payload
;
pUpdateMsg
->
head
.
vgId
=
htonl
(
pTableMeta
->
vgroupInfo
.
vgId
);
pUpdateMsg
->
tid
=
htonl
(
pTableMeta
->
sid
);
pUpdateMsg
->
uid
=
htobe64
(
pTableMeta
->
uid
);
...
...
src/client/src/tscServer.c
浏览文件 @
febd427b
...
...
@@ -176,18 +176,16 @@ int tscSendMsgToServer(SSqlObj *pSql) {
char
*
pMsg
=
rpcMallocCont
(
pCmd
->
payloadLen
);
if
(
NULL
==
pMsg
)
{
tscError
(
"%p msg:%s malloc fail"
,
pSql
,
taosMsg
[
pSql
->
cmd
.
msgType
]);
tscError
(
"%p msg:%s malloc fail
ed
"
,
pSql
,
taosMsg
[
pSql
->
cmd
.
msgType
]);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
if
(
pSql
->
cmd
.
command
<
TSDB_SQL_MGMT
)
{
memcpy
(
pMsg
,
pSql
->
cmd
.
payload
+
tsRpcHeadSize
,
pSql
->
cmd
.
payloadLen
);
}
else
{
// set the mgmt ip list
if
(
pSql
->
cmd
.
command
>=
TSDB_SQL_MGMT
)
{
pSql
->
ipList
=
tscMgmtIpSet
;
memcpy
(
pMsg
,
pSql
->
cmd
.
payload
,
pSql
->
cmd
.
payloadLen
);
}
// tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]
);
memcpy
(
pMsg
,
pSql
->
cmd
.
payload
,
pSql
->
cmd
.
payloadLen
);
SRpcMsg
rpcMsg
=
{
.
msgType
=
pSql
->
cmd
.
msgType
,
...
...
@@ -204,7 +202,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
void
tscProcessMsgFromServer
(
SRpcMsg
*
rpcMsg
,
SRpcIpSet
*
pIpSet
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
rpcMsg
->
handle
;
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
{
tscError
(
"%p sql is already released"
,
pSql
->
signature
);
tscError
(
"%p sql is already released"
,
pSql
);
return
;
}
...
...
@@ -222,8 +220,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
if
(
pQueryInfo
!=
NULL
&&
pQueryInfo
->
type
==
TSDB_QUERY_TYPE_FREE_RESOURCE
)
{
tscTrace
(
"%p sqlObj needs to be released or DB connection is closed, cmd:%d
pObj:%p signature:%p"
,
pSql
,
pCmd
->
command
,
pObj
,
pObj
->
signature
);
tscTrace
(
"%p sqlObj needs to be released or DB connection is closed, cmd:%d
type:%d, pObj:%p signature:%p"
,
pSql
,
pCmd
->
command
,
pQueryInfo
->
type
,
pObj
,
pObj
->
signature
);
tscFreeSqlObj
(
pSql
);
rpcFreeCont
(
rpcMsg
->
pCont
);
...
...
@@ -449,18 +447,11 @@ void tscKillSTableQuery(SSqlObj *pSql) {
}
int
tscBuildFetchMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
char
*
pMsg
,
*
pStart
;
pStart
=
pSql
->
cmd
.
payload
+
tsRpcHeadSize
;
pMsg
=
pStart
;
SRetrieveTableMsg
*
pRetrieveMsg
=
(
SRetrieveTableMsg
*
)
pMsg
;
SRetrieveTableMsg
*
pRetrieveMsg
=
(
SRetrieveTableMsg
*
)
pSql
->
cmd
.
payload
;
pRetrieveMsg
->
qhandle
=
htobe64
(
pSql
->
res
.
qhandle
);
pMsg
+=
sizeof
(
pSql
->
res
.
qhandle
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
pRetrieveMsg
->
free
=
htons
(
pQueryInfo
->
type
);
pMsg
+=
sizeof
(
pQueryInfo
->
type
);
// todo valid the vgroupId at the client side
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
@@ -474,12 +465,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vgroupInfo
.
vgId
);
}
pMsg
+=
sizeof
(
SRetrieveTableMsg
);
pRetrieveMsg
->
header
.
contLen
=
htonl
(
pSql
->
cmd
.
payloadLen
);
pSql
->
cmd
.
payloadLen
=
sizeof
(
SRetrieveTableMsg
);
pSql
->
cmd
.
msgType
=
TSDB_MSG_TYPE_FETCH
;
pRetrieveMsg
->
header
.
contLen
=
htonl
(
sizeof
(
SRetrieveTableMsg
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -487,30 +478,30 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
STableMeta
*
pTableMeta
=
tscGetMetaInfo
(
pQueryInfo
,
0
)
->
pTableMeta
;
char
*
pMsg
=
pSql
->
cmd
.
payload
+
tsRpcHeadSize
;
char
*
pMsg
=
pSql
->
cmd
.
payload
;
// NOTE: shell message size should not include SMsgDesc
int32_t
size
=
pSql
->
cmd
.
payloadLen
-
sizeof
(
SMsgDesc
);
int32_t
vgId
=
pTableMeta
->
vgroupInfo
.
vgId
;
SMsgDesc
*
pMsgDesc
=
(
SMsgDesc
*
)
pMsg
;
pMsgDesc
->
numOfVnodes
=
htonl
(
1
);
//todo set the right number of vnodes
pMsgDesc
->
numOfVnodes
=
htonl
(
1
);
// always one vnode
pMsg
+=
sizeof
(
SMsgDesc
);
SSubmitMsg
*
pShellMsg
=
(
SSubmitMsg
*
)
pMsg
;
int32_t
vgId
=
pTableMeta
->
vgroupInfo
.
vgId
;
pShellMsg
->
header
.
vgId
=
htonl
(
vgId
);
pShellMsg
->
header
.
contLen
=
htonl
(
size
);
pShellMsg
->
header
.
contLen
=
htonl
(
size
);
// the length not includes the size of SMsgDesc
pShellMsg
->
length
=
pShellMsg
->
header
.
contLen
;
pShellMsg
->
numOfBlocks
=
htonl
(
pSql
->
cmd
.
numOfTablesInSubmit
);
// number of
meter
s to be inserted
pShellMsg
->
numOfBlocks
=
htonl
(
pSql
->
cmd
.
numOfTablesInSubmit
);
// number of
table
s to be inserted
// pSql->cmd.payloadLen is set during copying data into payload
pSql
->
cmd
.
msgType
=
TSDB_MSG_TYPE_SUBMIT
;
tscSetDnodeIpList
(
pSql
,
&
pTableMeta
->
vgroupInfo
);
tscTrace
(
"%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d"
,
pSql
,
vgId
,
htonl
(
pMsgDesc
->
numOfVnodes
),
pSql
->
ipList
.
numOfIps
);
tscTrace
(
"%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d"
,
pSql
,
vgId
,
pSql
->
cmd
.
numOfTablesInSubmit
,
pSql
->
ipList
.
numOfIps
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -620,9 +611,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return
-
1
;
}
char
*
pStart
=
pCmd
->
payload
+
tsRpcHeadSize
;
SQueryTableMsg
*
pQueryMsg
=
(
SQueryTableMsg
*
)
pStart
;
SQueryTableMsg
*
pQueryMsg
=
(
SQueryTableMsg
*
)
pCmd
->
payload
;
int32_t
numOfTags
=
taosArrayGetSize
(
pTableMetaInfo
->
tagColList
);
...
...
@@ -821,7 +810,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
// compressed ts block
pQueryMsg
->
tsOffset
=
htonl
(
pMsg
-
p
Start
);
pQueryMsg
->
tsOffset
=
htonl
(
pMsg
-
p
Cmd
->
payload
);
int32_t
tsLen
=
0
;
int32_t
numOfBlocks
=
0
;
...
...
@@ -830,8 +819,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
assert
(
QUERY_IS_JOIN_QUERY
(
pQueryInfo
->
type
)
&&
pBlockInfo
!=
NULL
);
// this query should not be sent
// todo refactor
fseek
(
pQueryInfo
->
tsBuf
->
f
,
pBlockInfo
->
offset
,
SEEK_SET
);
fread
(
pMsg
,
pBlockInfo
->
compLen
,
1
,
pQueryInfo
->
tsBuf
->
f
);
if
(
fseek
(
pQueryInfo
->
tsBuf
->
f
,
pBlockInfo
->
offset
,
SEEK_SET
)
!=
0
)
{
int
code
=
TAOS_SYSTEM_ERROR
(
ferror
(
pQueryInfo
->
tsBuf
->
f
));
tscError
(
"%p: fseek failed: %s"
,
pSql
,
tstrerror
(
code
));
return
code
;
}
if
(
fread
(
pMsg
,
pBlockInfo
->
compLen
,
1
,
pQueryInfo
->
tsBuf
->
f
)
!=
pBlockInfo
->
compLen
)
{
int
code
=
TAOS_SYSTEM_ERROR
(
ferror
(
pQueryInfo
->
tsBuf
->
f
));
tscError
(
"%p: fread didn't return expected data: %s"
,
pSql
,
tstrerror
(
code
));
return
code
;
}
pMsg
+=
pBlockInfo
->
compLen
;
tsLen
=
pBlockInfo
->
compLen
;
...
...
@@ -844,7 +841,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg
->
tsOrder
=
htonl
(
pQueryInfo
->
tsBuf
->
tsOrder
);
}
int32_t
msgLen
=
pMsg
-
p
Start
;
int32_t
msgLen
=
pMsg
-
p
Cmd
->
payload
;
tscTrace
(
"%p msg built success,len:%d bytes"
,
pSql
,
msgLen
);
pCmd
->
payloadLen
=
msgLen
;
...
...
@@ -1286,10 +1283,12 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_UPDATE_TAG_VAL
;
SUpdateTableTagValMsg
*
pUpdateMsg
=
(
SUpdateTableTagValMsg
*
)
(
pCmd
->
payload
+
tsRpcHeadSize
)
;
SUpdateTableTagValMsg
*
pUpdateMsg
=
(
SUpdateTableTagValMsg
*
)
pCmd
->
payload
;
pCmd
->
payloadLen
=
htonl
(
pUpdateMsg
->
head
.
contLen
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
tscSetDnodeIpList
(
pSql
,
&
pTableMetaInfo
->
pTableMeta
->
vgroupInfo
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1552,150 +1551,6 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
//}
int
tscBuildSTableVgroupMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
#if 0
SSuperTableMetaMsg *pMetaMsg;
char * pMsg, *pStart;
int msgLen = 0;
int tableIndex = 0;
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STagCond *pTagCond = &pQueryInfo->tagCond;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
tscError("%p failed to malloc for metric meter msg", pSql);
return -1;
}
pStart = pCmd->payload + tsRpcHeadSize;
pMsg = pStart;
SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pMgmt->db);
pMsg += sizeof(SMgmtHead);
pMetaMsg = (SSuperTableMetaMsg *)pMsg;
pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables);
pMsg += sizeof(SSuperTableMetaMsg);
int32_t offset = pMsg - (char *)pMetaMsg;
pMetaMsg->join = htonl(offset);
// todo refactor
pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
pMsg += TSDB_TABLE_ID_LEN;
*(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
pMsg += sizeof(int16_t);
memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
pMsg += TSDB_TABLE_ID_LEN;
*(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
pMsg += sizeof(int16_t);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
uint64_t uid = pTableMetaInfo->pTableMeta->uid;
offset = pMsg - (char *)pMetaMsg;
pMetaMsg->metaElem[i] = htonl(offset);
SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg;
pMsg += sizeof(SSuperTableMetaElemMsg);
// convert to unicode before sending to mnode for metric query
int32_t condLen = 0;
if (pTagCond->numOfTagCond > 0) {
SCond *pCond = tsGetSTableQueryCond(pTagCond, uid);
if (pCond != NULL && pCond->cond != NULL) {
condLen = strlen(pCond->cond) + 1;
bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
if (!ret) {
tscError("%p mbs to ucs4 failed:%s", pSql, tsGetSTableQueryCond(pTagCond, uid));
return 0;
}
}
}
pElem->condLen = htonl(condLen);
offset = pMsg - (char *)pMetaMsg;
pElem->cond = htonl(offset);
pMsg += condLen * TSDB_NCHAR_SIZE;
pElem->rel = htons(pTagCond->relType);
if (pTagCond->tbnameCond.uid == uid) {
offset = pMsg - (char *)pMetaMsg;
pElem->tableCond = htonl(offset);
uint32_t len = 0;
if (pTagCond->tbnameCond.cond != NULL) {
len = strlen(pTagCond->tbnameCond.cond);
memcpy(pMsg, pTagCond->tbnameCond.cond, len);
}
pElem->tableCondLen = htonl(len);
pMsg += len;
}
SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
pElem->orderType = 0;
pElem->orderIndex = 0;
pElem->numOfGroupCols = 0;
} else {
pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
pElem->tagCols[j] = htons(pTableMetaInfo->tagColumnIndex[j]);
}
if (pGroupby->numOfGroupCols != 0) {
pElem->orderIndex = htons(pGroupby->orderIndex);
pElem->orderType = htons(pGroupby->orderType);
offset = pMsg - (char *)pMetaMsg;
pElem->groupbyTagColumnList = htonl(offset);
for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
SColIndex *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
SColIndex *pDestCol = (SColIndex *)pMsg;
pDestCol->colIdxInBuf = 0;
pDestCol->colIndex = htons(pCol->colIndex);
pDestCol->colId = htons(pDestCol->colId);
pDestCol->flag = htons(pDestCol->flag);
strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
pMsg += sizeof(SColIndex);
}
}
}
strcpy(pElem->tableId, pTableMetaInfo->name);
pElem->numOfTags = htons(pTableMetaInfo->numOfTags);
int16_t len = pMsg - (char *)pElem;
pElem->elemLen = htons(len); // redundant data for integrate check
}
msgLen = pMsg - pStart;
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
assert(msgLen + minMsgSize() <= size);
#endif
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
char
*
pMsg
=
pCmd
->
payload
;
...
...
@@ -1775,7 +1630,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
if
(
pMetaMsg
->
numOfTags
>
TSDB_MAX_TAGS
||
pMetaMsg
->
numOfTags
<
0
)
{
if
(
pMetaMsg
->
numOfTags
>
TSDB_MAX_TAGS
)
{
tscError
(
"invalid numOfTags:%d"
,
pMetaMsg
->
numOfTags
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
...
...
@@ -1795,7 +1650,11 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
for
(
int
i
=
0
;
i
<
numOfTotalCols
;
++
i
)
{
pSchema
->
bytes
=
htons
(
pSchema
->
bytes
);
pSchema
->
colId
=
htons
(
pSchema
->
colId
);
if
(
pSchema
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
assert
(
i
==
0
);
}
assert
(
pSchema
->
type
>=
TSDB_DATA_TYPE_BOOL
&&
pSchema
->
type
<=
TSDB_DATA_TYPE_NCHAR
);
pSchema
++
;
}
...
...
src/client/src/tscSql.c
浏览文件 @
febd427b
...
...
@@ -133,8 +133,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
return
NULL
;
}
// tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid
tsInsertHeadSize
=
tsRpcHeadSize
+
sizeof
(
SMsgDesc
)
+
sizeof
(
SSubmitMsg
);
tsInsertHeadSize
=
sizeof
(
SMsgDesc
)
+
sizeof
(
SSubmitMsg
);
return
pSql
;
}
...
...
src/client/src/tscSubquery.c
浏览文件 @
febd427b
...
...
@@ -180,6 +180,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
getTmpfilePath
(
"join-"
,
pSupporter
->
path
);
pSupporter
->
f
=
fopen
(
pSupporter
->
path
,
"w"
);
// todo handle error
if
(
pSupporter
->
f
==
NULL
)
{
tscError
(
"%p failed to create tmp file:%s, reason:%s"
,
pSql
,
pSupporter
->
path
,
strerror
(
errno
));
}
...
...
@@ -234,7 +235,7 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
/*
* launch secondary stage query to fetch the result that contains timestamp in set
*/
static
int32_t
tscLaunch
SecondPhase
Subqueries
(
SSqlObj
*
pSql
)
{
static
int32_t
tscLaunch
Real
Subqueries
(
SSqlObj
*
pSql
)
{
int32_t
numOfSub
=
0
;
SJoinSupporter
*
pSupporter
=
NULL
;
...
...
@@ -249,7 +250,7 @@ static int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
assert
(
numOfSub
>
0
);
// scan all subquery, if one sub query has only ts, ignore it
tscTrace
(
"%p start to launch secondary subquer
y
, total:%d, only:%d needs to query"
,
pSql
,
pSql
->
numOfSubs
,
numOfSub
);
tscTrace
(
"%p start to launch secondary subquer
ies
, total:%d, only:%d needs to query"
,
pSql
,
pSql
->
numOfSubs
,
numOfSub
);
//the subqueries that do not actually launch the secondary query to virtual node is set as completed.
SSubqueryState
*
pState
=
pSupporter
->
pState
;
...
...
@@ -451,7 +452,7 @@ static UNUSED_FUNC void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupport
freeJoinSubqueryObj
(
pParentSql
);
}
else
{
updateQueryTimeRange
(
pParentQueryInfo
,
&
win
);
tscLaunch
SecondPhase
Subqueries
(
pParentSql
);
tscLaunch
Real
Subqueries
(
pParentSql
);
}
}
...
...
@@ -851,7 +852,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// launch the query the retrieve actual results from vnode along with the filtered timestamp
SQueryInfo
*
pPQueryInfo
=
tscGetQueryInfoDetail
(
&
pParentSql
->
cmd
,
pParentSql
->
cmd
.
clauseIndex
);
updateQueryTimeRange
(
pPQueryInfo
,
&
win
);
tscLaunch
SecondPhase
Subqueries
(
pParentSql
);
tscLaunch
Real
Subqueries
(
pParentSql
);
}
static
void
joinRetrieveFinalResCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
)
{
...
...
@@ -1159,7 +1160,6 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
static
SSqlObj
*
tscCreateSqlObjForSubquery
(
SSqlObj
*
pSql
,
SRetrieveSupport
*
trsupport
,
SSqlObj
*
prevSqlObj
);
// todo merge with callback
int32_t
tscLaunchJoinSubquery
(
SSqlObj
*
pSql
,
int16_t
tableIndex
,
SJoinSupporter
*
pSupporter
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
...
...
@@ -1302,7 +1302,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
pState
->
numOfTotal
=
pQueryInfo
->
numOfTables
;
pState
->
numOfRemain
=
pState
->
numOfTotal
;
tscTrace
(
"%p start
launch
subquery, total:%d"
,
pSql
,
pQueryInfo
->
numOfTables
);
tscTrace
(
"%p start subquery, total:%d"
,
pSql
,
pQueryInfo
->
numOfTables
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
numOfTables
;
++
i
)
{
SJoinSupporter
*
pSupporter
=
tscCreateJoinSupporter
(
pSql
,
pState
,
i
);
...
...
@@ -1378,6 +1378,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
if
(
ret
!=
0
)
{
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscQueueAsyncRes
(
pSql
);
tfree
(
pMemoryBuf
);
return
ret
;
}
...
...
@@ -1729,7 +1730,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
pRes
->
numOfRows
,
pQueryInfo
->
groupbyExpr
.
orderType
);
if
(
ret
!=
0
)
{
// set no disk space error info, and abort retry
tscAbortFurtherRetryRetrieval
(
trsupport
,
tres
,
TSDB_CODE_TSC_NO_DISKSPACE
);
pthread_mutex_unlock
(
&
trsupport
->
queryMutex
);
}
else
if
(
pRes
->
completed
)
{
tscAllDataRetrievedFromDnode
(
trsupport
,
pSql
);
...
...
@@ -1848,8 +1848,6 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
static
void
multiVnodeInsertFinalize
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
)
{
SInsertSupporter
*
pSupporter
=
(
SInsertSupporter
*
)
param
;
SSqlObj
*
pParentObj
=
pSupporter
->
pSql
;
SSqlCmd
*
pParentCmd
=
&
pParentObj
->
cmd
;
SSubqueryState
*
pState
=
pSupporter
->
pState
;
// record the total inserted rows
...
...
@@ -1864,7 +1862,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
pParentObj
->
res
.
code
=
pSql
->
res
.
code
;
}
// it is not the initial sqlObj, free it
taos_free_result
(
tres
);
tfree
(
pSupporter
);
...
...
@@ -1876,7 +1873,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
// release data block data
tfree
(
pState
);
pParentCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pParentCmd
->
pDataBlocks
);
//
pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
// restore user defined fp
pParentObj
->
fp
=
pParentObj
->
fetchFp
;
...
...
@@ -1889,12 +1886,14 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SDataBlockList
*
pDataBlocks
=
pCmd
->
pDataBlocks
;
pSql
->
pSubs
=
calloc
(
pDataBlocks
->
nSize
,
POINTER_BYTES
);
pSql
->
numOfSubs
=
pDataBlocks
->
nSize
;
assert
(
pDataBlocks
->
nSize
>
0
);
tscTrace
(
"%p submit data to %d vnode(s)"
,
pSql
,
pDataBlocks
->
nSize
);
size_t
size
=
taosArrayGetSize
(
pCmd
->
pDataBlocks
);
assert
(
size
>
0
);
pSql
->
pSubs
=
calloc
(
size
,
POINTER_BYTES
);
pSql
->
numOfSubs
=
size
;
tscTrace
(
"%p submit data to %zu vnode(s)"
,
pSql
,
size
);
SSubqueryState
*
pState
=
calloc
(
1
,
sizeof
(
SSubqueryState
));
pState
->
numOfTotal
=
pSql
->
numOfSubs
;
pState
->
numOfRemain
=
pSql
->
numOfSubs
;
...
...
@@ -1920,12 +1919,14 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
pNew
->
fetchFp
=
pNew
->
fp
;
pSql
->
pSubs
[
numOfSub
]
=
pNew
;
pRes
->
code
=
tscCopyDataBlockToPayload
(
pNew
,
pDataBlocks
->
pData
[
numOfSub
++
]);
STableDataBlocks
*
pTableDataBlock
=
taosArrayGetP
(
pCmd
->
pDataBlocks
,
numOfSub
);
pRes
->
code
=
tscCopyDataBlockToPayload
(
pNew
,
pTableDataBlock
);
if
(
pRes
->
code
==
TSDB_CODE_SUCCESS
)
{
tscTrace
(
"%p sub:%p create subObj success. orderOfSub:%d"
,
pSql
,
pNew
,
numOfSub
);
numOfSub
++
;
}
else
{
tscTrace
(
"%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%
d
, code:%s"
,
pSql
,
numOfSub
,
pDataBlocks
->
nS
ize
,
tstrerror
(
pRes
->
code
));
tscTrace
(
"%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%
zu
, code:%s"
,
pSql
,
numOfSub
,
s
ize
,
tstrerror
(
pRes
->
code
));
goto
_error
;
}
}
...
...
@@ -1942,7 +1943,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
tscTrace
(
"%p sub:%p launch sub insert, orderOfSub:%d"
,
pSql
,
pSub
,
j
);
tscProcessSql
(
pSub
);
}
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
return
TSDB_CODE_SUCCESS
;
_error:
...
...
src/client/src/tscUtil.c
浏览文件 @
febd427b
...
...
@@ -13,8 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tscUtil.h"
#include "hash.h"
#include "os.h"
#include "qast.h"
#include "taosmsg.h"
...
...
@@ -29,6 +27,8 @@
#include "ttimer.h"
#include "ttokendef.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "hash.h"
static
void
freeQueryInfoImpl
(
SQueryInfo
*
pQueryInfo
);
static
void
clearAllTableMetaInfo
(
SQueryInfo
*
pQueryInfo
,
const
char
*
address
,
bool
removeFromCache
);
...
...
@@ -428,48 +428,18 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
return
param
;
}
SDataBlockList
*
tscCreateBlockArrayList
()
{
const
int32_t
DEFAULT_INITIAL_NUM_OF_BLOCK
=
16
;
SDataBlockList
*
pDataBlockArrayList
=
calloc
(
1
,
sizeof
(
SDataBlockList
));
if
(
pDataBlockArrayList
==
NULL
)
{
void
*
tscDestroyBlockArrayList
(
SArray
*
pDataBlockList
)
{
if
(
pDataBlockList
==
NULL
)
{
return
NULL
;
}
pDataBlockArrayList
->
nAlloc
=
DEFAULT_INITIAL_NUM_OF_BLOCK
;
pDataBlockArrayList
->
pData
=
calloc
(
1
,
POINTER_BYTES
*
pDataBlockArrayList
->
nAlloc
);
if
(
pDataBlockArrayList
->
pData
==
NULL
)
{
free
(
pDataBlockArrayList
);
return
NULL
;
}
return
pDataBlockArrayList
;
}
void
tscAppendDataBlock
(
SDataBlockList
*
pList
,
STableDataBlocks
*
pBlocks
)
{
if
(
pList
->
nSize
>=
pList
->
nAlloc
)
{
pList
->
nAlloc
=
(
pList
->
nAlloc
)
<<
1U
;
pList
->
pData
=
realloc
(
pList
->
pData
,
POINTER_BYTES
*
(
size_t
)
pList
->
nAlloc
);
// reset allocated memory
memset
(
pList
->
pData
+
pList
->
nSize
,
0
,
POINTER_BYTES
*
(
pList
->
nAlloc
-
pList
->
nSize
));
size_t
size
=
taosArrayGetSize
(
pDataBlockList
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
void
*
d
=
taosArrayGetP
(
pDataBlockList
,
i
);
tscDestroyDataBlock
(
d
);
}
pList
->
pData
[
pList
->
nSize
++
]
=
pBlocks
;
}
void
*
tscDestroyBlockArrayList
(
SDataBlockList
*
pList
)
{
if
(
pList
==
NULL
)
{
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
pList
->
nSize
;
i
++
)
{
tscDestroyDataBlock
(
pList
->
pData
[
i
]);
}
tfree
(
pList
->
pData
);
tfree
(
pList
);
taosArrayDestroy
(
pDataBlockList
);
return
NULL
;
}
...
...
@@ -484,7 +454,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
// set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache
if
(
pTableMetaInfo
->
pTableMeta
!=
pDataBlock
->
pTableMeta
)
{
strcpy
(
pTableMetaInfo
->
name
,
pDataBlock
->
tableId
);
tstrncpy
(
pTableMetaInfo
->
name
,
pDataBlock
->
tableId
,
sizeof
(
pTableMetaInfo
->
name
)
);
taosCacheRelease
(
tscCacheHandle
,
(
void
**
)
&
(
pTableMetaInfo
->
pTableMeta
),
false
);
pTableMetaInfo
->
pTableMeta
=
taosCacheTransfer
(
tscCacheHandle
,
(
void
**
)
&
pDataBlock
->
pTableMeta
);
...
...
@@ -497,31 +467,32 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
* the dataBlock only includes the RPC Header buffer and actual submit message body, space for digest needs
* additional space.
*/
int
ret
=
tscAllocPayload
(
pCmd
,
pDataBlock
->
nAllocS
ize
+
100
);
int
ret
=
tscAllocPayload
(
pCmd
,
pDataBlock
->
s
ize
+
100
);
if
(
TSDB_CODE_SUCCESS
!=
ret
)
{
return
ret
;
}
memcpy
(
pCmd
->
payload
,
pDataBlock
->
pData
,
pDataBlock
->
nAllocSize
);
assert
(
pDataBlock
->
size
<=
pDataBlock
->
nAllocSize
);
memcpy
(
pCmd
->
payload
,
pDataBlock
->
pData
,
pDataBlock
->
size
);
/*
* the payloadLen should be actual message body size
* the old value of payloadLen is the allocated payload size
*/
pCmd
->
payloadLen
=
pDataBlock
->
nAllocSize
-
tsRpcHeadS
ize
;
pCmd
->
payloadLen
=
pDataBlock
->
s
ize
;
assert
(
pCmd
->
allocSize
>=
pCmd
->
payloadLen
+
tsRpcHeadSize
+
100
&&
pCmd
->
payloadLen
>
0
);
assert
(
pCmd
->
allocSize
>=
pCmd
->
payloadLen
+
100
&&
pCmd
->
payloadLen
>
0
);
return
TSDB_CODE_SUCCESS
;
}
void
tscFreeUnusedDataBlocks
(
SDataBlockList
*
pList
)
{
/* release additional memory consumption */
for
(
int32_t
i
=
0
;
i
<
pList
->
nSize
;
++
i
)
{
STableDataBlocks
*
pDataBlock
=
pList
->
pData
[
i
];
pDataBlock
->
pData
=
realloc
(
pDataBlock
->
pData
,
pDataBlock
->
size
);
pDataBlock
->
nAllocSize
=
(
uint32_t
)
pDataBlock
->
size
;
}
}
//
void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
//
/* release additional memory consumption */
//
for (int32_t i = 0; i < pList->nSize; ++i) {
//
STableDataBlocks* pDataBlock = pList->pData[i];
//
pDataBlock->pData = realloc(pDataBlock->pData, pDataBlock->size);
//
pDataBlock->nAllocSize = (uint32_t)pDataBlock->size;
//
}
//
}
/**
* create the in-memory buffer for each table to keep the submitted data block
...
...
@@ -568,7 +539,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
return
TSDB_CODE_SUCCESS
;
}
int32_t
tscGetDataBlockFromList
(
void
*
pHashList
,
S
DataBlockList
*
pDataBlockList
,
int64_t
id
,
int32_t
size
,
int32_t
tscGetDataBlockFromList
(
void
*
pHashList
,
S
Array
*
pDataBlockList
,
int64_t
id
,
int32_t
size
,
int32_t
startOffset
,
int32_t
rowSize
,
const
char
*
tableId
,
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
)
{
*
dataBlocks
=
NULL
;
...
...
@@ -585,7 +556,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
}
taosHashPut
(
pHashList
,
(
const
char
*
)
&
id
,
sizeof
(
int64_t
),
(
char
*
)
dataBlocks
,
POINTER_BYTES
);
t
scAppendDataBlock
(
pDataBlockList
,
*
dataBlocks
);
t
aosArrayPush
(
pDataBlockList
,
dataBlocks
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -634,14 +605,15 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
return
len
;
}
int32_t
tscMergeTableDataBlocks
(
SSqlObj
*
pSql
,
S
DataBlockList
*
pTableDataBlockList
)
{
int32_t
tscMergeTableDataBlocks
(
SSqlObj
*
pSql
,
S
Array
*
pTableDataBlockList
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
void
*
pVnodeDataBlockHashList
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
);
S
DataBlockList
*
pVnodeDataBlockList
=
tscCreateBlockArrayList
(
);
S
Array
*
pVnodeDataBlockList
=
taosArrayInit
(
8
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
pTableDataBlockList
->
nSize
;
++
i
)
{
STableDataBlocks
*
pOneTableBlock
=
pTableDataBlockList
->
pData
[
i
];
size_t
total
=
taosArrayGetSize
(
pTableDataBlockList
);
for
(
int32_t
i
=
0
;
i
<
total
;
++
i
)
{
STableDataBlocks
*
pOneTableBlock
=
taosArrayGetP
(
pTableDataBlockList
,
i
);
STableDataBlocks
*
dataBuf
=
NULL
;
...
...
@@ -679,10 +651,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)
pOneTableBlock
->
pData
;
tscSortRemoveDataBlockDupRows
(
pOneTableBlock
);
char
*
e
=
(
char
*
)
pBlocks
->
data
+
pOneTableBlock
->
rowSize
*
(
pBlocks
->
numOfRows
-
1
);
char
*
e
key
=
(
char
*
)
pBlocks
->
data
+
pOneTableBlock
->
rowSize
*
(
pBlocks
->
numOfRows
-
1
);
tscTrace
(
"%p tableId:%s, sid:%d rows:%d sversion:%d skey:%"
PRId64
", ekey:%"
PRId64
,
pSql
,
pOneTableBlock
->
tableId
,
pBlocks
->
tid
,
pBlocks
->
numOfRows
,
pBlocks
->
sversion
,
GET_INT64_VAL
(
pBlocks
->
data
),
GET_INT64_VAL
(
e
));
pBlocks
->
tid
,
pBlocks
->
numOfRows
,
pBlocks
->
sversion
,
GET_INT64_VAL
(
pBlocks
->
data
),
GET_INT64_VAL
(
e
key
));
int32_t
len
=
pBlocks
->
numOfRows
*
(
pOneTableBlock
->
rowSize
+
sizeof
(
int32_t
)
*
2
);
...
...
@@ -704,7 +676,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
// free the table data blocks;
pCmd
->
pDataBlocks
=
pVnodeDataBlockList
;
tscFreeUnusedDataBlocks
(
pCmd
->
pDataBlocks
);
//
tscFreeUnusedDataBlocks(pCmd->pDataBlocks);
taosHashCleanup
(
pVnodeDataBlockHashList
);
return
TSDB_CODE_SUCCESS
;
...
...
src/inc/taosdef.h
浏览文件 @
febd427b
...
...
@@ -244,7 +244,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE
#define TSDB_PAYLOAD_SIZE
(TSDB_DEFAULT_PKT_SIZE - 100)
#define TSDB_PAYLOAD_SIZE
TSDB_DEFAULT_PKT_SIZE
#define TSDB_DEFAULT_PAYLOAD_SIZE 2048 // default payload size
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_CQ_SQL_SIZE 1024
...
...
src/inc/tsdb.h
浏览文件 @
febd427b
...
...
@@ -105,8 +105,9 @@ typedef struct {
void
tsdbClearTableCfg
(
STableCfg
*
config
);
void
*
tsdbGetTableTagVal
(
TSDB_REPO_T
*
repo
,
const
STableId
*
id
,
int32_t
colId
,
int16_t
type
,
int16_t
bytes
);
char
*
tsdbGetTableName
(
TSDB_REPO_T
*
repo
,
const
STableId
*
id
);
void
*
tsdbGetTableTagVal
(
const
void
*
pTable
,
int32_t
colId
,
int16_t
type
,
int16_t
bytes
);
char
*
tsdbGetTableName
(
void
*
pTable
);
STableId
tsdbGetTableId
(
void
*
pTable
);
STableCfg
*
tsdbCreateTableCfgFromMsg
(
SMDCreateTableMsg
*
pMsg
);
int
tsdbCreateTable
(
TSDB_REPO_T
*
repo
,
STableCfg
*
pCfg
);
...
...
@@ -176,18 +177,16 @@ typedef struct SQueryRowCond {
TSKEY
ts
;
}
SQueryRowCond
;
typedef
void
*
TsdbPosT
;
/**
* Get the data block iterator, starting from position according to the query condition
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param
g
roupInfo tableId list in the form of set, seperated into different groups according to group by condition
* @param
tableqinfoG
roupInfo tableId list in the form of set, seperated into different groups according to group by condition
* @param qinfo query info handle from query processor
* @return
*/
TsdbQueryHandleT
*
tsdbQueryTables
(
TSDB_REPO_T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
g
roupInfo
,
void
*
qinfo
);
TsdbQueryHandleT
*
tsdbQueryTables
(
TSDB_REPO_T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableqinfoG
roupInfo
,
void
*
qinfo
);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
...
...
@@ -197,12 +196,17 @@ TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each
* block
* @param
g
roupInfo tableId list.
* @param
tableqinfoG
roupInfo tableId list.
* @return
*/
TsdbQueryHandleT
tsdbQueryLastRow
(
TSDB_REPO_T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
g
roupInfo
,
void
*
qinfo
);
TsdbQueryHandleT
tsdbQueryLastRow
(
TSDB_REPO_T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableqinfoG
roupInfo
,
void
*
qinfo
);
SArray
*
tsdbGetQueriedTableIdList
(
TsdbQueryHandleT
*
pHandle
);
/**
* get the queried table object list
* @param pHandle
* @return
*/
SArray
*
tsdbGetQueriedTableList
(
TsdbQueryHandleT
*
pHandle
);
TsdbQueryHandleT
tsdbQueryRowsInExternalWindow
(
TSDB_REPO_T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
);
...
...
@@ -247,37 +251,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataSta
*/
SArray
*
tsdbRetrieveDataBlock
(
TsdbQueryHandleT
*
pQueryHandle
,
SArray
*
pColumnIdList
);
/**
* todo remove this function later
* @param pQueryHandle
* @param pIdList
* @return
*/
SArray
*
tsdbRetrieveDataRow
(
TsdbQueryHandleT
*
pQueryHandle
,
SArray
*
pIdList
,
SQueryRowCond
*
pCond
);
/**
* Get iterator for super tables, of which tags values satisfy the tag filter info
*
* NOTE: the tagFilterStr is an bin-expression for tag filter, such as ((tag_col = 5) and (tag_col2 > 7))
* The filter string is sent from client directly.
* The build of the tags filter expression from string is done in the iterator generating function.
*
* @param pCond query condition
* @param pTagFilterStr tag filter info
* @return
*/
TsdbQueryHandleT
*
tsdbQueryFromTagConds
(
STsdbQueryCond
*
pCond
,
int16_t
stableId
,
const
char
*
pTagFilterStr
);
/**
* Get the qualified tables for (super) table query.
* Used to handle the super table projection queries, the last_row query, the group by on normal columns query,
* the interpolation query, and timestamp-comp query for join processing.
*
* @param pQueryHandle
* @return table sid list. the invoker is responsible for the release of this the sid list.
*/
SArray
*
tsdbGetTableList
(
TsdbQueryHandleT
*
pQueryHandle
);
/**
* Get the qualified table id for a super table according to the tag query expression.
* @param stableid. super table sid
...
...
@@ -287,6 +260,12 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T *tsdb, uint64_t uid, const char *pT
int16_t
tagNameRelType
,
const
char
*
tbnameCond
,
STableGroupInfo
*
pGroupList
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
);
/**
* destory the created table group list, which is generated by tag query
* @param pGroupList
*/
void
tsdbDestoryTableGroup
(
STableGroupInfo
*
pGroupList
);
/**
* create the table group result including only one table, used to handle the normal table query
*
...
...
src/kit/shell/src/shellEngine.c
浏览文件 @
febd427b
...
...
@@ -474,7 +474,7 @@ static int dumpResultToFile(const char* fname, TAOS_RES* tres) {
}
while
(
row
!=
NULL
);
result
=
NULL
;
taos_free_result
(
tres
);
//
taos_free_result(tres);
fclose
(
fp
);
return
numOfRows
;
...
...
@@ -803,6 +803,7 @@ void source_file(TAOS *con, char *fptr) {
char
*
fname
=
full_path
.
we_wordv
[
0
];
/*
if (access(fname, F_OK) != 0) {
fprintf(stderr, "ERROR: file %s is not exist\n", fptr);
...
...
@@ -810,6 +811,7 @@ void source_file(TAOS *con, char *fptr) {
free(cmd);
return;
}
*/
FILE
*
f
=
fopen
(
fname
,
"r"
);
if
(
f
==
NULL
)
{
...
...
@@ -849,7 +851,7 @@ void source_file(TAOS *con, char *fptr) {
void
shellGetGrantInfo
(
void
*
con
)
{
return
;
#if 0
char sql[] = "show grants";
TAOS_RES* tres = taos_query(con, sql);
...
...
@@ -900,4 +902,5 @@ void shellGetGrantInfo(void *con) {
}
fprintf(stdout, "\n");
#endif
}
src/kit/shell/src/shellImport.c
浏览文件 @
febd427b
...
...
@@ -73,7 +73,7 @@ static void shellParseDirectory(const char *directoryName, const char *prefix, c
}
int
fileNum
=
0
;
while
(
fscanf
(
fp
,
"%s"
,
fileArray
[
fileNum
++
]))
{
while
(
fscanf
(
fp
,
"%
128
s"
,
fileArray
[
fileNum
++
]))
{
if
(
strcmp
(
fileArray
[
fileNum
-
1
],
shellTablesSQLFile
)
==
0
)
{
fileNum
--
;
}
...
...
@@ -150,9 +150,11 @@ static void shellSourceFile(TAOS *con, char *fptr) {
char
*
fname
=
full_path
.
we_wordv
[
0
];
if
(
fname
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: invalid filename
\n
"
);
free
(
cmd
);
return
;
}
/*
if (access(fname, F_OK) != 0) {
fprintf(stderr, "ERROR: file %s is not exist\n", fptr);
...
...
@@ -168,6 +170,7 @@ static void shellSourceFile(TAOS *con, char *fptr) {
free(cmd);
return;
}
*/
FILE
*
f
=
fopen
(
fname
,
"r"
);
if
(
f
==
NULL
)
{
...
...
src/kit/shell/src/shellLinux.c
浏览文件 @
febd427b
...
...
@@ -162,13 +162,13 @@ void shellReadCommand(TAOS *con, char *command) {
// Read input.
char
c
;
while
(
1
)
{
c
=
getchar
();
c
=
(
char
)
getchar
();
// getchar() return an 'int' value
if
(
c
<
0
)
{
// For UTF-8
int
count
=
countPrefixOnes
(
c
);
utf8_array
[
0
]
=
c
;
for
(
int
k
=
1
;
k
<
count
;
k
++
)
{
c
=
getchar
();
c
=
(
char
)
getchar
();
utf8_array
[
k
]
=
c
;
}
insertChar
(
&
cmd
,
utf8_array
,
count
);
...
...
@@ -214,10 +214,10 @@ void shellReadCommand(TAOS *con, char *command) {
break
;
}
}
else
if
(
c
==
'\033'
)
{
c
=
getchar
();
c
=
(
char
)
getchar
();
switch
(
c
)
{
case
'['
:
c
=
getchar
();
c
=
(
char
)
getchar
();
switch
(
c
)
{
case
'A'
:
// Up arrow
if
(
hist_counter
!=
history
.
hstart
)
{
...
...
@@ -244,35 +244,35 @@ void shellReadCommand(TAOS *con, char *command) {
moveCursorLeft
(
&
cmd
);
break
;
case
'1'
:
if
((
c
=
getchar
())
==
'~'
)
{
if
((
c
=
(
char
)
getchar
())
==
'~'
)
{
// Home key
positionCursorHome
(
&
cmd
);
}
break
;
case
'2'
:
if
((
c
=
getchar
())
==
'~'
)
{
if
((
c
=
(
char
)
getchar
())
==
'~'
)
{
// Insert key
}
break
;
case
'3'
:
if
((
c
=
getchar
())
==
'~'
)
{
if
((
c
=
(
char
)
getchar
())
==
'~'
)
{
// Delete key
deleteChar
(
&
cmd
);
}
break
;
case
'4'
:
if
((
c
=
getchar
())
==
'~'
)
{
if
((
c
=
(
char
)
getchar
())
==
'~'
)
{
// End key
positionCursorEnd
(
&
cmd
);
}
break
;
case
'5'
:
if
((
c
=
getchar
())
==
'~'
)
{
if
((
c
=
(
char
)
getchar
())
==
'~'
)
{
// Page up key
}
break
;
case
'6'
:
if
((
c
=
getchar
())
==
'~'
)
{
if
((
c
=
(
char
)
getchar
())
==
'~'
)
{
// Page down key
}
break
;
...
...
src/kit/taosdemo/taosdemo.c
浏览文件 @
febd427b
...
...
@@ -34,6 +34,7 @@
#include <wordexp.h>
#include "taos.h"
#include "tutil.h"
extern
char
configDir
[];
...
...
@@ -82,7 +83,7 @@ typedef struct DemoArguments {
bool
insert_only
;
char
*
output_file
;
int
mode
;
char
*
datatype
[
MAX_NUM_DATATYPE
];
char
*
datatype
[
MAX_NUM_DATATYPE
+
1
];
int
len_of_binary
;
int
num_of_CPR
;
int
num_of_threads
;
...
...
@@ -432,7 +433,7 @@ int main(int argc, char *argv[]) {
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
);
printf
(
"###################################################################
\n\n
"
);
printf
(
"Press enter key to continue"
);
getchar
();
(
void
)
getchar
();
fprintf
(
fp
,
"###################################################################
\n
"
);
fprintf
(
fp
,
"# Server IP: %s:%hu
\n
"
,
ip_addr
==
NULL
?
"localhost"
:
ip_addr
,
port
);
...
...
@@ -550,8 +551,8 @@ int main(int argc, char *argv[]) {
for
(
int
i
=
0
;
i
<
threads
;
i
++
)
{
info
*
t_info
=
infos
+
i
;
t_info
->
threadID
=
i
;
strcpy
(
t_info
->
db_name
,
db_name
);
strcpy
(
t_info
->
tb_prefix
,
tb_prefix
);
tstrncpy
(
t_info
->
db_name
,
db_name
,
MAX_DB_NAME_SIZE
);
tstrncpy
(
t_info
->
tb_prefix
,
tb_prefix
,
MAX_TB_NAME_SIZE
);
t_info
->
datatype
=
data_type
;
t_info
->
ncols_per_record
=
ncols_per_record
;
t_info
->
nrecords_per_table
=
nrecords_per_table
;
...
...
@@ -845,10 +846,10 @@ void *syncWrite(void *sarg) {
pstr
+=
sprintf
(
pstr
,
"insert into %s.%s%d values"
,
winfo
->
db_name
,
winfo
->
tb_prefix
,
tID
);
int
k
;
for
(
k
=
0
;
k
<
winfo
->
nrecords_per_request
;)
{
int
rand_num
=
rand
()
%
100
;
int
rand_num
=
t
rand
()
%
100
;
int
len
=
-
1
;
if
(
winfo
->
data_of_order
==
1
&&
rand_num
<
winfo
->
data_of_rate
)
{
long
d
=
tmp_time
-
rand
()
%
1000000
+
rand_num
;
long
d
=
tmp_time
-
t
rand
()
%
1000000
+
rand_num
;
len
=
generateData
(
data
,
data_type
,
ncols_per_record
,
d
,
len_of_binary
);
}
else
{
len
=
generateData
(
data
,
data_type
,
ncols_per_record
,
tmp_time
+=
1000
,
len_of_binary
);
...
...
@@ -940,10 +941,10 @@ void callBack(void *param, TAOS_RES *res, int code) {
pstr
+=
sprintf
(
pstr
,
"insert into %s values"
,
tb_info
->
tb_name
);
for
(
int
i
=
0
;
i
<
tb_info
->
nrecords_per_request
;
i
++
)
{
int
rand_num
=
rand
()
%
100
;
int
rand_num
=
t
rand
()
%
100
;
if
(
tb_info
->
data_of_order
==
1
&&
rand_num
<
tb_info
->
data_of_rate
)
{
long
d
=
tmp_time
-
rand
()
%
1000000
+
rand_num
;
long
d
=
tmp_time
-
t
rand
()
%
1000000
+
rand_num
;
generateData
(
data
,
datatype
,
ncols_per_record
,
d
,
len_of_binary
);
}
else
{
...
...
@@ -985,22 +986,27 @@ int32_t generateData(char *res, char **data_type, int num_of_cols, int64_t times
}
}
if
(
0
==
c
)
{
perror
(
"data type error!"
);
exit
(
-
1
);
}
for
(
int
i
=
0
;
i
<
num_of_cols
;
i
++
)
{
if
(
strcasecmp
(
data_type
[
i
%
c
],
"tinyint"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
", %d"
,
(
int
)(
rand
()
%
128
));
pstr
+=
sprintf
(
pstr
,
", %d"
,
(
int
)(
t
rand
()
%
128
));
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"smallint"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
", %d"
,
(
int
)(
rand
()
%
32767
));
pstr
+=
sprintf
(
pstr
,
", %d"
,
(
int
)(
t
rand
()
%
32767
));
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"int"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
", %d"
,
(
int
)(
rand
()
%
10
));
pstr
+=
sprintf
(
pstr
,
", %d"
,
(
int
)(
t
rand
()
%
10
));
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"bigint"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
", %"
PRId64
,
rand
()
%
2147483648
);
pstr
+=
sprintf
(
pstr
,
", %"
PRId64
,
t
rand
()
%
2147483648
);
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"float"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
", %10.4f"
,
(
float
)(
rand
()
/
100
0
));
pstr
+=
sprintf
(
pstr
,
", %10.4f"
,
(
float
)(
trand
()
/
1000
.
0
));
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"double"
)
==
0
)
{
double
t
=
(
double
)(
rand
()
/
100000
0
);
double
t
=
(
double
)(
trand
()
/
1000000
.
0
);
pstr
+=
sprintf
(
pstr
,
", %20.8f"
,
t
);
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"bool"
)
==
0
)
{
bool
b
=
rand
()
&
1
;
bool
b
=
t
rand
()
&
1
;
pstr
+=
sprintf
(
pstr
,
", %s"
,
b
?
"true"
:
"false"
);
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"binary"
)
==
0
)
{
char
s
[
len_of_binary
];
...
...
@@ -1026,7 +1032,7 @@ void rand_string(char *str, int size) {
--
size
;
int
n
;
for
(
n
=
0
;
n
<
size
;
n
++
)
{
int
key
=
rand
()
%
(
int
)(
sizeof
charset
-
1
);
int
key
=
t
rand
()
%
(
int
)(
sizeof
charset
-
1
);
str
[
n
]
=
charset
[
key
];
}
str
[
n
]
=
0
;
...
...
src/kit/taosdump/taosdump.c
浏览文件 @
febd427b
...
...
@@ -229,7 +229,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
fprintf
(
stderr
,
"Invalid path %s
\n
"
,
arg
);
return
-
1
;
}
strcpy
(
arguments
->
output
,
full_path
.
we_wordv
[
0
]
);
tstrncpy
(
arguments
->
output
,
full_path
.
we_wordv
[
0
],
TSDB_FILENAME_LEN
);
wordfree
(
&
full_path
);
break
;
case
'i'
:
...
...
@@ -238,7 +238,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
fprintf
(
stderr
,
"Invalid path %s
\n
"
,
arg
);
return
-
1
;
}
strcpy
(
arguments
->
input
,
full_path
.
we_wordv
[
0
]
);
tstrncpy
(
arguments
->
input
,
full_path
.
we_wordv
[
0
],
TSDB_FILENAME_LEN
);
wordfree
(
&
full_path
);
break
;
case
'c'
:
...
...
@@ -246,7 +246,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
fprintf
(
stderr
,
"Invalid path %s
\n
"
,
arg
);
return
-
1
;
}
strcpy
(
configDir
,
full_path
.
we_wordv
[
0
]
);
tstrncpy
(
configDir
,
full_path
.
we_wordv
[
0
],
TSDB_FILENAME_LEN
);
wordfree
(
&
full_path
);
break
;
case
'e'
:
...
...
@@ -411,7 +411,7 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) {
if
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
isSet
=
true
;
pTableRecordInfo
->
isMetric
=
true
;
strcpy
(
pTableRecordInfo
->
tableRecord
.
metric
,
table
);
tstrncpy
(
pTableRecordInfo
->
tableRecord
.
metric
,
table
,
TSDB_TABLE_NAME_LEN
);
}
taos_free_result
(
result
);
...
...
@@ -537,11 +537,11 @@ int taosDumpOut(SDumpArguments *arguments) {
if
(
arguments
->
databases
||
arguments
->
all_databases
)
{
for
(
int
i
=
0
;
i
<
count
;
i
++
)
{
taosDumpDb
(
dbInfos
[
i
],
arguments
,
fp
);
(
void
)
taosDumpDb
(
dbInfos
[
i
],
arguments
,
fp
);
}
}
else
{
if
(
arguments
->
arg_list_len
==
1
)
{
taosDumpDb
(
dbInfos
[
0
],
arguments
,
fp
);
(
void
)
taosDumpDb
(
dbInfos
[
0
],
arguments
,
fp
);
}
else
{
taosDumpCreateDbClause
(
dbInfos
[
0
],
arguments
->
with_property
,
fp
);
...
...
@@ -560,9 +560,9 @@ int taosDumpOut(SDumpArguments *arguments) {
}
if
(
tableRecordInfo
.
isMetric
)
{
// dump whole metric
taosDumpMetric
(
tableRecordInfo
.
tableRecord
.
metric
,
arguments
,
fp
);
(
void
)
taosDumpMetric
(
tableRecordInfo
.
tableRecord
.
metric
,
arguments
,
fp
);
}
else
{
// dump MTable and NTable
taosDumpTable
(
tableRecordInfo
.
tableRecord
.
name
,
tableRecordInfo
.
tableRecord
.
metric
,
arguments
,
fp
);
(
void
)
taosDumpTable
(
tableRecordInfo
.
tableRecord
.
name
,
tableRecordInfo
.
tableRecord
.
metric
,
arguments
,
fp
);
}
}
}
...
...
@@ -642,17 +642,22 @@ int taosDumpDb(SDbInfo *dbInfo, SDumpArguments *arguments, FILE *fp) {
taos_free_result
(
result
);
lseek
(
fd
,
0
,
SEEK_SET
);
(
void
)
lseek
(
fd
,
0
,
SEEK_SET
);
while
(
read
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
))
>
0
)
{
while
(
1
)
{
memset
(
&
tableRecord
,
0
,
sizeof
(
STableRecord
));
ssize_t
ret
=
read
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
if
(
ret
<=
0
)
break
;
tableRecord
.
name
[
sizeof
(
tableRecord
.
name
)
-
1
]
=
0
;
tableRecord
.
metric
[
sizeof
(
tableRecord
.
metric
)
-
1
]
=
0
;
taosDumpTable
(
tableRecord
.
name
,
tableRecord
.
metric
,
arguments
,
fp
);
}
close
(
fd
);
remove
(
".table.tmp"
);
return
remove
(
".table.tmp"
)
;
return
0
;
}
void
taosDumpCreateTableClause
(
STableDef
*
tableDes
,
int
numOfCols
,
SDumpArguments
*
arguments
,
FILE
*
fp
)
{
...
...
@@ -807,7 +812,7 @@ int taosGetTableDes(char *table, STableDef *tableDes) {
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
result
);
strcpy
(
tableDes
->
name
,
table
);
tstrncpy
(
tableDes
->
name
,
table
,
TSDB_COL_NAME_LEN
);
while
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
strncpy
(
tableDes
->
cols
[
count
].
field
,
(
char
*
)
row
[
TSDB_DESCRIBE_METRIC_FIELD_INDEX
],
...
...
@@ -874,7 +879,7 @@ int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp) {
int
fd
=
-
1
;
STableRecord
tableRecord
;
tstrncpy
(
tableRecord
.
metric
,
metric
,
TSDB_TABLE_NAME_LEN
);
//
tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN);
sprintf
(
command
,
"select tbname from %s"
,
metric
);
TAOS_RES
*
result
=
taos_query
(
taos
,
command
);
...
...
@@ -895,24 +900,28 @@ int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp) {
while
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
memset
(
&
tableRecord
,
0
,
sizeof
(
STableRecord
));
strncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
0
],
fields
[
0
].
bytes
);
strcpy
(
tableRecord
.
metric
,
metric
);
t
strncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
0
],
fields
[
0
].
bytes
);
tstrncpy
(
tableRecord
.
metric
,
metric
,
TSDB_TABLE_NAME_LEN
);
twrite
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
}
taos_free_result
(
result
);
result
=
NULL
;
lseek
(
fd
,
0
,
SEEK_SET
);
(
void
)
lseek
(
fd
,
0
,
SEEK_SET
);
while
(
read
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
))
>
0
)
{
while
(
1
)
{
memset
(
&
tableRecord
,
0
,
sizeof
(
STableRecord
));
ssize_t
ret
=
read
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
if
(
ret
<=
0
)
break
;
tableRecord
.
name
[
sizeof
(
tableRecord
.
name
)
-
1
]
=
0
;
tableRecord
.
metric
[
sizeof
(
tableRecord
.
metric
)
-
1
]
=
0
;
taosDumpTable
(
tableRecord
.
name
,
tableRecord
.
metric
,
arguments
,
fp
);
}
t
close
(
fd
);
remove
(
".table.tmp"
);
close
(
fd
);
(
void
)
remove
(
".table.tmp"
);
return
0
;
}
...
...
@@ -1004,7 +1013,7 @@ int taosDumpTableData(FILE *fp, char *tbname, SDumpArguments *arguments) {
break
;
}
}
pstr
+=
sprintf
(
pstr
,
")"
);
sprintf
(
pstr
,
")"
);
count
++
;
fprintf
(
fp
,
"%s"
,
buffer
);
...
...
@@ -1327,7 +1336,7 @@ int convertNCharToReadable(char *str, int size, char *buf, int bufsize) {
if
((
int
)
wc
<
256
)
{
pbuf
=
stpcpy
(
pbuf
,
ascii_literal_list
[(
int
)
wc
]);
}
else
{
}
else
if
(
byte_width
>
0
)
{
memcpy
(
pbuf
,
pstr
,
byte_width
);
pbuf
+=
byte_width
;
}
...
...
src/mnode/src/mnodeAcct.c
浏览文件 @
febd427b
...
...
@@ -126,8 +126,8 @@ int32_t mnodeInitAccts() {
}
void
mnodeCleanupAccts
()
{
sdbCloseTable
(
tsAcctSdb
);
acctCleanUp
();
sdbCloseTable
(
tsAcctSdb
);
}
void
*
mnodeGetAcct
(
char
*
name
)
{
...
...
src/mnode/src/mnodeVgroup.c
浏览文件 @
febd427b
...
...
@@ -678,8 +678,9 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
handle
;
mnodeMsg
->
received
++
;
if
(
rpcMsg
->
code
==
TSDB_CODE_SUCCESS
)
{
mnodeMsg
->
code
=
rpcMsg
->
code
;
mnodeMsg
->
successed
++
;
}
else
{
mnodeMsg
->
code
=
rpcMsg
->
code
;
}
SVgObj
*
pVgroup
=
mnodeMsg
->
pVgroup
;
...
...
@@ -702,7 +703,7 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
code
=
TSDB_CODE_MND_SDB_ERROR
;
}
dnodeSendRpcMnodeWriteRsp
(
mnodeMsg
,
code
);
dnodeSendRpcMnodeWriteRsp
(
mnodeMsg
,
mnodeMsg
->
code
);
}
}
...
...
src/os/linux/src/linuxPlatform.c
浏览文件 @
febd427b
...
...
@@ -56,70 +56,8 @@ void taosMsleep(int mseconds) {
bool
taosCheckPthreadValid
(
pthread_t
thread
)
{
return
thread
!=
0
;
}
void
taosResetPthread
(
pthread_t
*
thread
)
{
*
thread
=
0
;
}
int64_t
taosGetPthreadId
()
{
return
(
int64_t
)
pthread_self
();
}
/*
* Function to get the private ip address of current machine. If get IP
* successfully, return 0, else, return -1. The return values is ip.
*
* Use:
* if (taosGetPrivateIp(ip) != 0) {
* perror("Fail to get private IP address\n");
* exit(EXIT_FAILURE);
* }
*/
int
taosGetPrivateIp
(
char
*
const
ip
)
{
bool
hasLoCard
=
false
;
struct
ifaddrs
*
ifaddr
,
*
ifa
;
int
family
,
s
;
char
host
[
NI_MAXHOST
];
if
(
getifaddrs
(
&
ifaddr
)
==
-
1
)
{
return
-
1
;
}
/* Walk through linked list, maintaining head pointer so we can free list later */
int
flag
=
0
;
for
(
ifa
=
ifaddr
;
ifa
!=
NULL
;
ifa
=
ifa
->
ifa_next
)
{
if
(
ifa
->
ifa_addr
==
NULL
)
continue
;
family
=
ifa
->
ifa_addr
->
sa_family
;
if
(
strcmp
(
"lo"
,
ifa
->
ifa_name
)
==
0
)
{
hasLoCard
=
true
;
continue
;
}
if
(
family
==
AF_INET
)
{
/* printf("%-8s", ifa->ifa_name); */
s
=
getnameinfo
(
ifa
->
ifa_addr
,
(
family
==
AF_INET
)
?
sizeof
(
struct
sockaddr_in
)
:
sizeof
(
struct
sockaddr_in6
),
host
,
NI_MAXHOST
,
NULL
,
0
,
NI_NUMERICHOST
);
if
(
s
!=
0
)
{
freeifaddrs
(
ifaddr
);
return
-
1
;
}
strcpy
(
ip
,
host
);
flag
=
1
;
break
;
}
}
freeifaddrs
(
ifaddr
);
if
(
flag
)
{
return
0
;
}
else
{
if
(
hasLoCard
)
{
uPrint
(
"no net card was found, use lo:127.0.0.1 as default"
);
strcpy
(
ip
,
"127.0.0.1"
);
return
0
;
}
return
-
1
;
}
}
int
taosSetNonblocking
(
int
sock
,
int
on
)
{
int
flags
=
0
;
if
((
flags
=
fcntl
(
sock
,
F_GETFL
,
0
))
<
0
)
{
...
...
@@ -294,21 +232,6 @@ ssize_t twrite(int fd, void *buf, size_t n) {
return
n
;
}
bool
taosSkipSocketCheck
()
{
struct
utsname
buf
;
if
(
uname
(
&
buf
))
{
uPrint
(
"can't fetch os info"
);
return
false
;
}
if
(
strstr
(
buf
.
release
,
"Microsoft"
)
!=
0
)
{
uPrint
(
"using WSLv1"
);
return
true
;
}
return
false
;
}
void
taosBlockSIGPIPE
()
{
sigset_t
signal_mask
;
sigemptyset
(
&
signal_mask
);
...
...
src/os/linux/src/linuxSysPara.c
浏览文件 @
febd427b
...
...
@@ -162,7 +162,13 @@ static void taosGetSystemTimezone() {
FILE
*
f
=
fopen
(
"/etc/timezone"
,
"r"
);
char
buf
[
65
]
=
{
0
};
if
(
f
!=
NULL
)
{
fread
(
buf
,
64
,
1
,
f
);
int
len
=
fread
(
buf
,
64
,
1
,
f
);
if
(
len
<
64
&&
ferror
(
f
))
{
fclose
(
f
);
uError
(
"read /etc/timezone error, reason:%s"
,
strerror
(
errno
));
return
;
}
fclose
(
f
);
}
...
...
@@ -547,7 +553,7 @@ void taosSetCoreDump() {
struct
rlimit
rlim
;
struct
rlimit
rlim_new
;
if
(
getrlimit
(
RLIMIT_CORE
,
&
rlim
)
==
0
)
{
uPrint
(
"the old unlimited para: rlim_cur=%
d, rlim_max=%d"
,
rlim
.
rlim_cur
,
rlim
.
rlim_max
);
uPrint
(
"the old unlimited para: rlim_cur=%
"
PRIu64
,
", rlim_max=%"
PRIu64
,
rlim
.
rlim_cur
,
rlim
.
rlim_max
);
rlim_new
.
rlim_cur
=
RLIM_INFINITY
;
rlim_new
.
rlim_max
=
RLIM_INFINITY
;
if
(
setrlimit
(
RLIMIT_CORE
,
&
rlim_new
)
!=
0
)
{
...
...
@@ -559,7 +565,7 @@ void taosSetCoreDump() {
}
if
(
getrlimit
(
RLIMIT_CORE
,
&
rlim
)
==
0
)
{
uPrint
(
"the new unlimited para: rlim_cur=%
d, rlim_max=%d"
,
rlim
.
rlim_cur
,
rlim
.
rlim_max
);
uPrint
(
"the new unlimited para: rlim_cur=%
"
PRIu64
,
", rlim_max=%"
PRIu64
,
rlim
.
rlim_cur
,
rlim
.
rlim_max
);
}
#ifndef _TD_ARM_
...
...
@@ -586,7 +592,7 @@ void taosSetCoreDump() {
uPrint
(
"_sysctl(kern_core_uses_pid) set fail: %s"
,
strerror
(
errno
));
}
uPrint
(
"The old core_uses_pid[%
d
]: %d"
,
old_len
,
old_usespid
);
uPrint
(
"The old core_uses_pid[%
"
PRIu64
"
]: %d"
,
old_len
,
old_usespid
);
old_usespid
=
0
;
...
...
@@ -603,7 +609,7 @@ void taosSetCoreDump() {
uPrint
(
"_sysctl(kern_core_uses_pid) get fail: %s"
,
strerror
(
errno
));
}
uPrint
(
"The new core_uses_pid[%
d
]: %d"
,
old_len
,
old_usespid
);
uPrint
(
"The new core_uses_pid[%
"
PRIu64
"
]: %d"
,
old_len
,
old_usespid
);
#endif
#if 0
...
...
src/query/inc/qExecutor.h
浏览文件 @
febd427b
...
...
@@ -102,7 +102,7 @@ typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct
int64_t
tag
;
STimeWindow
win
;
STSCursor
cur
;
STableId
id
;
// for retrieve the page id list
void
*
pTable
;
// for retrieve the page id list
SWindowResInfo
windowResInfo
;
}
STableQueryInfo
;
...
...
@@ -126,10 +126,10 @@ typedef struct SQueryCostInfo {
uint64_t
computTime
;
}
SQueryCostInfo
;
typedef
struct
SGroupItem
{
STableId
id
;
STableQueryInfo
*
info
;
}
SGroupItem
;
//
typedef struct SGroupItem {
// void *pTable
;
// STableQueryInfo *
info;
//
} SGroupItem;
typedef
struct
SQuery
{
int16_t
numOfCols
;
...
...
@@ -187,8 +187,8 @@ typedef struct SQInfo {
void
*
tsdb
;
int32_t
vgId
;
STableGroupInfo
table
IdGroupInfo
;
// table id list < only includes the STableId
list>
STableGroupInfo
groupInfo
;
//
STableGroupInfo
table
GroupInfo
;
// table id list < only includes the STable
list>
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
SQueryRuntimeEnv
runtimeEnv
;
int32_t
groupIndex
;
int32_t
offset
;
// offset in group result set of subgroup, todo refactor
...
...
src/query/src/qExecutor.c
浏览文件 @
febd427b
此差异已折叠。
点击以展开。
src/query/src/qUtil.c
浏览文件 @
febd427b
...
...
@@ -114,8 +114,6 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
i
];
if
(
pResult
->
status
.
closed
)
{
// remove the window slot from hash table
taosHashRemove
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
pWindowResInfo
->
type
);
printf
(
"remove ============>%ld, remain size:%ld
\n
"
,
pResult
->
window
.
skey
,
pWindowResInfo
->
hashList
->
size
);
}
else
{
break
;
}
...
...
src/query/src/qresultBuf.c
浏览文件 @
febd427b
...
...
@@ -72,7 +72,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf
if
(
ret
!=
0
)
{
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
// strerror(errno));
return
-
TSDB_CODE_QRY_NO_DISKSPACE
;
return
TSDB_CODE_QRY_NO_DISKSPACE
;
}
pResultBuf
->
totalBufSize
=
pResultBuf
->
numOfPages
*
DEFAULT_INTERN_BUF_PAGE_SIZE
;
...
...
@@ -80,7 +80,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf
if
(
pResultBuf
->
pBuf
==
MAP_FAILED
)
{
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return
-
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
return
TSDB_CODE_SUCCESS
;
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
febd427b
...
...
@@ -158,18 +158,16 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
return
0
;
}
void
*
tsdbGetTableTagVal
(
TSDB_REPO_T
*
repo
,
const
STableId
*
id
,
int32_t
colId
,
int16_t
type
,
int16_t
bytes
)
{
void
*
tsdbGetTableTagVal
(
const
void
*
pTable
,
int32_t
colId
,
int16_t
type
,
int16_t
bytes
)
{
// TODO: this function should be changed also
STsdbMeta
*
pMeta
=
tsdbGetMeta
(
repo
);
STable
*
pTable
=
tsdbGetTableByUid
(
pMeta
,
id
->
uid
);
STSchema
*
pSchema
=
tsdbGetTableTagSchema
(
pTable
);
STSchema
*
pSchema
=
tsdbGetTableTagSchema
(
(
STable
*
)
pTable
);
STColumn
*
pCol
=
tdGetColOfID
(
pSchema
,
colId
);
if
(
pCol
==
NULL
)
{
return
NULL
;
// No matched tag volumn
}
char
*
val
=
tdGetKVRowValOfCol
(
pTable
->
tagVal
,
colId
);
char
*
val
=
tdGetKVRowValOfCol
(
((
STable
*
)
pTable
)
->
tagVal
,
colId
);
assert
(
type
==
pCol
->
type
&&
bytes
==
pCol
->
bytes
);
if
(
val
!=
NULL
&&
IS_VAR_DATA_TYPE
(
type
))
{
...
...
@@ -179,20 +177,21 @@ void *tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, i
return
val
;
}
char
*
tsdbGetTableName
(
TSDB_REPO_T
*
repo
,
const
STableId
*
id
)
{
char
*
tsdbGetTableName
(
void
*
pTable
)
{
// TODO: need to change as thread-safe
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STable
*
pTable
=
tsdbGetTableByUid
(
pMeta
,
id
->
uid
);
if
(
pTable
==
NULL
)
{
return
NULL
;
}
else
{
return
(
char
*
)
pTable
->
name
;
return
(
char
*
)
(((
STable
*
)
pTable
)
->
name
)
;
}
}
STableId
tsdbGetTableId
(
void
*
pTable
)
{
assert
(
pTable
);
return
((
STable
*
)
pTable
)
->
tableId
;
}
STableCfg
*
tsdbCreateTableCfgFromMsg
(
SMDCreateTableMsg
*
pMsg
)
{
if
(
pMsg
==
NULL
)
return
NULL
;
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
febd427b
...
...
@@ -184,15 +184,17 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
assert
(
gsize
>
0
);
for
(
int32_t
j
=
0
;
j
<
gsize
;
++
j
)
{
STable
Id
*
id
=
(
STableId
*
)
taosArrayGet
(
group
,
j
);
STable
*
pTable
=
(
STable
*
)
taosArrayGetP
(
group
,
j
);
STableCheckInfo
info
=
{
.
lastKey
=
pQueryHandle
->
window
.
skey
,
.
tableId
=
*
i
d
,
.
pTableObj
=
tsdbGetTableByUid
(
pMeta
,
id
->
uid
)
,
.
tableId
=
pTable
->
tableI
d
,
.
pTableObj
=
pTable
,
};
assert
(
info
.
pTableObj
!=
NULL
&&
info
.
pTableObj
->
tableId
.
tid
==
id
->
tid
);
assert
(
info
.
pTableObj
!=
NULL
&&
(
info
.
pTableObj
->
type
==
TSDB_NORMAL_TABLE
||
info
.
pTableObj
->
type
==
TSDB_CHILD_TABLE
));
taosArrayPush
(
pQueryHandle
->
pTableCheckInfo
,
&
info
);
}
}
...
...
@@ -215,17 +217,17 @@ TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab
return
pQueryHandle
;
}
SArray
*
tsdbGetQueriedTable
Id
List
(
TsdbQueryHandleT
*
pHandle
)
{
SArray
*
tsdbGetQueriedTableList
(
TsdbQueryHandleT
*
pHandle
)
{
assert
(
pHandle
!=
NULL
);
STsdbQueryHandle
*
pQueryHandle
=
(
STsdbQueryHandle
*
)
pHandle
;
size_t
size
=
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
);
SArray
*
res
=
taosArrayInit
(
size
,
sizeof
(
STableId
)
);
SArray
*
res
=
taosArrayInit
(
size
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pQueryHandle
->
pTableCheckInfo
,
i
);
taosArrayPush
(
res
,
&
pCheckInfo
->
tableId
);
taosArrayPush
(
res
,
&
pCheckInfo
->
pTableObj
);
}
return
res
;
...
...
@@ -1052,7 +1054,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t
end
=
doBinarySearchKey
(
pCols
->
cols
[
0
].
pData
,
pCols
->
numOfRows
,
key
,
order
);
if
(
tsArray
[
end
]
==
key
)
{
// the value of key in cache equals to the end timestamp value, ignore it
tSkipListIterNext
(
pCheckInfo
->
iter
);
moveToNextRow
(
pCheckInfo
);
}
int32_t
start
=
-
1
;
...
...
@@ -1607,6 +1609,7 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
}
if
(
index
==
-
1
)
{
// todo add failure test cases
return
;
}
...
...
@@ -1856,42 +1859,19 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
SArray
*
tsdbRetrieveDataRow
(
TsdbQueryHandleT
*
pQueryHandle
,
SArray
*
pIdList
,
SQueryRowCond
*
pCond
)
{
return
NULL
;
}
TsdbQueryHandleT
*
tsdbQueryFromTagConds
(
STsdbQueryCond
*
pCond
,
int16_t
stableId
,
const
char
*
pTagFilterStr
)
{
return
NULL
;
}
SArray
*
tsdbGetTableList
(
TsdbQueryHandleT
*
pQueryHandle
)
{
return
NULL
;
}
static
int32_t
getAllTableIdList
(
STable
*
pSuperTable
,
SArray
*
list
)
{
static
int32_t
getAllTableList
(
STable
*
pSuperTable
,
SArray
*
list
)
{
SSkipListIterator
*
iter
=
tSkipListCreateIter
(
pSuperTable
->
pIndex
);
while
(
tSkipListIterNext
(
iter
))
{
SSkipListNode
*
pNode
=
tSkipListIterGet
(
iter
);
STable
**
pTable
=
(
STable
**
)
SL_GET_NODE_DATA
((
SSkipListNode
*
)
pNode
);
taosArrayPush
(
list
,
&
(
*
pTable
)
->
tableId
);
taosArrayPush
(
list
,
pTable
);
}
tSkipListDestroyIter
(
iter
);
return
TSDB_CODE_SUCCESS
;
}
/**
* convert the result pointer to table id instead of table object pointer
* todo remove it by using callback function to change the final result in-time.
* @param pRes
*/
static
void
convertQueryResult
(
SArray
*
pRes
,
SArray
*
pTableList
)
{
if
(
pTableList
==
NULL
||
taosArrayGetSize
(
pTableList
)
==
0
)
{
return
;
}
size_t
size
=
taosArrayGetSize
(
pTableList
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
// todo speedup by using reserve space.
STable
*
pTable
=
taosArrayGetP
(
pTableList
,
i
);
taosArrayPush
(
pRes
,
&
pTable
->
tableId
);
}
}
static
void
destroyHelper
(
void
*
param
)
{
if
(
param
==
NULL
)
{
return
;
...
...
@@ -1960,16 +1940,13 @@ typedef struct STableGroupSupporter {
int32_t
numOfCols
;
SColIndex
*
pCols
;
STSchema
*
pTagSchema
;
void
*
tsdbMeta
;
//
void* tsdbMeta;
}
STableGroupSupporter
;
int32_t
tableGroupComparFn
(
const
void
*
p1
,
const
void
*
p2
,
const
void
*
param
)
{
STableGroupSupporter
*
pTableGroupSupp
=
(
STableGroupSupporter
*
)
param
;
STableId
*
id1
=
(
STableId
*
)
p1
;
STableId
*
id2
=
(
STableId
*
)
p2
;
STable
*
pTable1
=
tsdbGetTableByUid
(
pTableGroupSupp
->
tsdbMeta
,
id1
->
uid
);
STable
*
pTable2
=
tsdbGetTableByUid
(
pTableGroupSupp
->
tsdbMeta
,
id2
->
uid
);
STable
*
pTable1
=
*
(
STable
**
)
p1
;
STable
*
pTable2
=
*
(
STable
**
)
p2
;
for
(
int32_t
i
=
0
;
i
<
pTableGroupSupp
->
numOfCols
;
++
i
)
{
SColIndex
*
pColIndex
=
&
pTableGroupSupp
->
pCols
[
i
];
...
...
@@ -2019,26 +1996,29 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
return
0
;
}
void
createTableGroupImpl
(
SArray
*
pGroups
,
SArray
*
pTable
Id
List
,
size_t
numOfTables
,
STableGroupSupporter
*
pSupp
,
void
createTableGroupImpl
(
SArray
*
pGroups
,
SArray
*
pTableList
,
size_t
numOfTables
,
STableGroupSupporter
*
pSupp
,
__ext_compar_fn_t
compareFn
)
{
STableId
*
pId
=
taosArrayGet
(
pTableIdList
,
0
);
SArray
*
g
=
taosArrayInit
(
16
,
sizeof
(
STableId
));
taosArrayPush
(
g
,
pId
);
STable
*
pTable
=
taosArrayGetP
(
pTableList
,
0
);
SArray
*
g
=
taosArrayInit
(
16
,
POINTER_BYTES
);
taosArrayPush
(
g
,
&
pTable
);
tsdbRefTable
(
pTable
);
for
(
int32_t
i
=
1
;
i
<
numOfTables
;
++
i
)
{
STable
Id
*
prev
=
taosArrayGet
(
pTableId
List
,
i
-
1
);
STable
Id
*
p
=
taosArrayGet
(
pTableId
List
,
i
);
STable
**
prev
=
taosArrayGet
(
pTable
List
,
i
-
1
);
STable
**
p
=
taosArrayGet
(
pTable
List
,
i
);
int32_t
ret
=
compareFn
(
prev
,
p
,
pSupp
);
assert
(
ret
==
0
||
ret
==
-
1
);
tsdbRefTable
(
*
p
);
assert
((
*
p
)
->
type
==
TSDB_CHILD_TABLE
);
if
(
ret
==
0
)
{
taosArrayPush
(
g
,
p
);
}
else
{
taosArrayPush
(
pGroups
,
&
g
);
// current group is ended, start a new group
g
=
taosArrayInit
(
16
,
sizeof
(
STableId
));
g
=
taosArrayInit
(
16
,
POINTER_BYTES
);
taosArrayPush
(
g
,
p
);
}
}
...
...
@@ -2046,8 +2026,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableIdList, size_t numOfTab
taosArrayPush
(
pGroups
,
&
g
);
}
SArray
*
createTableGroup
(
SArray
*
pTableList
,
STSchema
*
pTagSchema
,
SColIndex
*
pCols
,
int32_t
numOfOrderCols
,
TSDB_REPO_T
*
tsdb
)
{
SArray
*
createTableGroup
(
SArray
*
pTableList
,
STSchema
*
pTagSchema
,
SColIndex
*
pCols
,
int32_t
numOfOrderCols
)
{
assert
(
pTableList
!=
NULL
);
SArray
*
pTableGroup
=
taosArrayInit
(
1
,
POINTER_BYTES
);
...
...
@@ -2058,22 +2037,24 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
}
if
(
numOfOrderCols
==
0
||
size
==
1
)
{
// no group by tags clause or only one table
SArray
*
sa
=
taosArrayInit
(
size
,
sizeof
(
STableId
)
);
SArray
*
sa
=
taosArrayInit
(
size
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableId
*
tableId
=
taosArrayGet
(
pTableList
,
i
);
taosArrayPush
(
sa
,
tableId
);
STable
**
pTable
=
taosArrayGet
(
pTableList
,
i
);
assert
((
*
pTable
)
->
type
==
TSDB_CHILD_TABLE
);
tsdbRefTable
(
*
pTable
);
taosArrayPush
(
sa
,
pTable
);
}
taosArrayPush
(
pTableGroup
,
&
sa
);
tsdbTrace
(
"all %zu tables belong to one group"
,
size
);
}
else
{
STableGroupSupporter
*
pSupp
=
(
STableGroupSupporter
*
)
calloc
(
1
,
sizeof
(
STableGroupSupporter
));
pSupp
->
tsdbMeta
=
tsdbGetMeta
(
tsdb
);
pSupp
->
numOfCols
=
numOfOrderCols
;
pSupp
->
pTagSchema
=
pTagSchema
;
pSupp
->
pCols
=
pCols
;
taosqsort
(
pTableList
->
pData
,
size
,
sizeof
(
STableId
)
,
pSupp
,
tableGroupComparFn
);
taosqsort
(
pTableList
->
pData
,
size
,
POINTER_BYTES
,
pSupp
,
tableGroupComparFn
);
createTableGroupImpl
(
pTableGroup
,
pTableList
,
size
,
pSupp
,
tableGroupComparFn
);
tfree
(
pSupp
);
}
...
...
@@ -2149,48 +2130,53 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
.
pExtInfo
=
pSTable
->
tagSchema
,
};
SArray
*
pTableList
=
taosArrayInit
(
8
,
POINTER_BYTES
);
tExprTreeTraverse
(
pExpr
,
pSTable
->
pIndex
,
pTableList
,
&
supp
);
tExprTreeTraverse
(
pExpr
,
pSTable
->
pIndex
,
pRes
,
&
supp
);
tExprTreeDestroy
(
&
pExpr
,
destroyHelper
);
convertQueryResult
(
pRes
,
pTableList
);
taosArrayDestroy
(
pTableList
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbQuerySTableByTagCond
(
TSDB_REPO_T
*
tsdb
,
uint64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
int16_t
tagNameRelType
,
const
char
*
tbnameCond
,
STableGroupInfo
*
pGroupInfo
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
)
{
if
(
tsdbRLockRepoMeta
(
tsdb
)
<
0
)
goto
_error
;
STable
*
pTable
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
uid
);
if
(
pTable
==
NULL
)
{
tsdbError
(
"%p failed to get stable, uid:%"
PRIu64
,
tsdb
,
uid
);
return
TSDB_CODE_TDB_INVALID_TABLE_ID
;
terrno
=
TSDB_CODE_TDB_INVALID_TABLE_ID
;
tsdbUnlockRepoMeta
(
tsdb
);
goto
_error
;
}
if
(
pTable
->
type
!=
TSDB_SUPER_TABLE
)
{
tsdbError
(
"%p query normal tag not allowed, uid:%"
PRIu64
", tid:%d, name:%s"
,
tsdb
,
uid
,
pTable
->
tableId
.
tid
,
pTable
->
name
->
data
);
return
TSDB_CODE_COM_OPS_NOT_SUPPORT
;
//basically, this error is caused by invalid sql issued by client
terrno
=
TSDB_CODE_COM_OPS_NOT_SUPPORT
;
//basically, this error is caused by invalid sql issued by client
tsdbUnlockRepoMeta
(
tsdb
);
goto
_error
;
}
SArray
*
res
=
taosArrayInit
(
8
,
sizeof
(
STableId
));
//NOTE: not add ref count for super table
SArray
*
res
=
taosArrayInit
(
8
,
POINTER_BYTES
);
STSchema
*
pTagSchema
=
tsdbGetTableTagSchema
(
pTable
);
// no tags and tbname condition, all child tables of this stable are involved
if
(
tbnameCond
==
NULL
&&
(
pTagCond
==
NULL
||
len
==
0
))
{
int32_t
ret
=
getAllTableIdList
(
pTable
,
res
);
if
(
ret
==
TSDB_CODE_SUCCESS
)
{
pGroupInfo
->
numOfTables
=
taosArrayGetSize
(
res
);
pGroupInfo
->
pGroupList
=
createTableGroup
(
res
,
pTagSchema
,
pColIndex
,
numOfCols
,
tsdb
);
tsdbTrace
(
"%p no table name/tag condition, all tables belong to one group, numOfTables:%zu"
,
tsdb
,
pGroupInfo
->
numOfTables
);
}
else
{
// todo add error
int32_t
ret
=
getAllTableList
(
pTable
,
res
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
tsdbUnlockRepoMeta
(
tsdb
);
goto
_error
;
}
pGroupInfo
->
numOfTables
=
taosArrayGetSize
(
res
);
pGroupInfo
->
pGroupList
=
createTableGroup
(
res
,
pTagSchema
,
pColIndex
,
numOfCols
);
tsdbTrace
(
"%p no table name/tag condition, all tables belong to one group, numOfTables:%zu"
,
tsdb
,
pGroupInfo
->
numOfTables
);
taosArrayDestroy
(
res
);
if
(
tsdbUnlockRepoMeta
(
tsdb
)
<
0
)
goto
_error
;
return
ret
;
}
...
...
@@ -2227,31 +2213,45 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
doQueryTableList
(
pTable
,
res
,
expr
);
pGroupInfo
->
numOfTables
=
taosArrayGetSize
(
res
);
pGroupInfo
->
pGroupList
=
createTableGroup
(
res
,
pTagSchema
,
pColIndex
,
numOfCols
,
tsdb
);
pGroupInfo
->
pGroupList
=
createTableGroup
(
res
,
pTagSchema
,
pColIndex
,
numOfCols
);
tsdbTrace
(
"%p stable tid:%d, uid:%"
PRIu64
" query, numOfTables:%zu, belong to %zu groups"
,
tsdb
,
pTable
->
tableId
.
tid
,
pTable
->
tableId
.
uid
,
pGroupInfo
->
numOfTables
,
taosArrayGetSize
(
pGroupInfo
->
pGroupList
));
taosArrayDestroy
(
res
);
if
(
tsdbUnlockRepoMeta
(
tsdb
)
<
0
)
goto
_error
;
return
ret
;
_error:
return
terrno
;
}
int32_t
tsdbGetOneTableGroup
(
TSDB_REPO_T
*
tsdb
,
uint64_t
uid
,
STableGroupInfo
*
pGroupInfo
)
{
if
(
tsdbRLockRepoMeta
(
tsdb
)
<
0
)
goto
_error
;
STable
*
pTable
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
uid
);
if
(
pTable
==
NULL
)
{
return
TSDB_CODE_TDB_INVALID_TABLE_ID
;
terrno
=
TSDB_CODE_TDB_INVALID_TABLE_ID
;
goto
_error
;
}
//todo assert table type, add the table ref count
assert
(
pTable
->
type
==
TSDB_CHILD_TABLE
||
pTable
->
type
==
TSDB_NORMAL_TABLE
);
tsdbRefTable
(
pTable
);
if
(
tsdbUnlockRepoMeta
(
tsdb
)
<
0
)
goto
_error
;
pGroupInfo
->
numOfTables
=
1
;
pGroupInfo
->
pGroupList
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
group
=
taosArrayInit
(
1
,
sizeof
(
STableId
)
);
SArray
*
group
=
taosArrayInit
(
1
,
POINTER_BYTES
);
taosArrayPush
(
group
,
&
pTable
->
tableId
);
taosArrayPush
(
group
,
&
pTable
);
taosArrayPush
(
pGroupInfo
->
pGroupList
,
&
group
);
return
TSDB_CODE_SUCCESS
;
_error:
return
terrno
;
}
void
tsdbCleanupQueryHandle
(
TsdbQueryHandleT
queryHandle
)
{
...
...
@@ -2263,12 +2263,11 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
size_t
size
=
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableCheckInfo
*
pTableCheckInfo
=
taosArrayGet
(
pQueryHandle
->
pTableCheckInfo
,
i
);
tSkipListDestroyIter
(
pTableCheckInfo
->
iter
);
tsdbUnRefMemTable
(
pQueryHandle
->
pTsdb
,
pTableCheckInfo
->
mem
);
tsdbUnRefMemTable
(
pQueryHandle
->
pTsdb
,
pTableCheckInfo
->
imem
);
tSkipListDestroyIter
(
pTableCheckInfo
->
iter
);
if
(
pTableCheckInfo
->
pDataCols
!=
NULL
)
{
tfree
(
pTableCheckInfo
->
pDataCols
->
buf
);
}
...
...
@@ -2293,3 +2292,26 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
tfree
(
pQueryHandle
);
}
void
tsdbDestoryTableGroup
(
STableGroupInfo
*
pGroupList
)
{
assert
(
pGroupList
!=
NULL
);
size_t
numOfGroup
=
taosArrayGetSize
(
pGroupList
->
pGroupList
);
for
(
int32_t
i
=
0
;
i
<
numOfGroup
;
++
i
)
{
SArray
*
p
=
taosArrayGetP
(
pGroupList
->
pGroupList
,
i
);
size_t
numOfTables
=
taosArrayGetSize
(
p
);
for
(
int32_t
j
=
0
;
j
<
numOfTables
;
++
j
)
{
STable
*
pTable
=
taosArrayGetP
(
p
,
j
);
assert
(
pTable
!=
NULL
);
tsdbUnRefTable
(
pTable
);
}
taosArrayDestroy
(
p
);
}
taosArrayDestroy
(
pGroupList
->
pGroupList
);
}
src/util/inc/tutil.h
浏览文件 @
febd427b
...
...
@@ -119,6 +119,8 @@ extern "C" {
uint32_t
taosRand
(
void
);
uint32_t
trand
(
void
);
size_t
twcslen
(
const
wchar_t
*
wcs
);
int32_t
strdequote
(
char
*
src
);
...
...
src/util/src/tlog.c
浏览文件 @
febd427b
...
...
@@ -147,7 +147,7 @@ static void *taosThreadToOpenNewFile(void *param) {
return
NULL
;
}
taosLockFile
(
fd
);
lseek
(
fd
,
0
,
SEEK_SET
);
(
void
)
lseek
(
fd
,
0
,
SEEK_SET
);
int32_t
oldFd
=
tsLogObj
.
logHandle
->
fd
;
tsLogObj
.
logHandle
->
fd
=
fd
;
...
...
src/util/src/tnote.c
浏览文件 @
febd427b
...
...
@@ -92,7 +92,7 @@ void *taosThreadToOpenNewNote(void *param)
}
taosLockNote
(
fd
,
pNote
);
lseek
(
fd
,
0
,
SEEK_SET
);
(
void
)
lseek
(
fd
,
0
,
SEEK_SET
);
int
oldFd
=
pNote
->
taosNoteFd
;
pNote
->
taosNoteFd
=
fd
;
...
...
@@ -128,10 +128,12 @@ int taosOpenNewNote(taosNoteInfo * pNote)
bool
taosCheckNoteIsOpen
(
char
*
noteName
,
taosNoteInfo
*
pNote
)
{
/*
int exist = access(noteName, F_OK);
if (exist != 0) {
return false;
}
*/
int
fd
=
open
(
noteName
,
O_WRONLY
|
O_CREAT
,
S_IRWXU
|
S_IRWXG
|
S_IRWXO
);
if
(
fd
<
0
)
{
...
...
src/util/src/ttimer.c
浏览文件 @
febd427b
...
...
@@ -385,8 +385,8 @@ static void taosTimerLoopFunc(int signo) {
timer
=
next
;
}
pthread_mutex_unlock
(
&
wheel
->
mutex
);
wheel
->
nextScanAt
+=
wheel
->
resolution
;
pthread_mutex_unlock
(
&
wheel
->
mutex
);
}
addToExpired
(
expired
);
...
...
@@ -514,17 +514,14 @@ static void taosTmrModuleInit(void) {
tmrError
(
"failed to create the mutex for wheel, reason:%s"
,
strerror
(
errno
));
return
;
}
pthread_mutex_lock
(
&
wheel
->
mutex
);
wheel
->
nextScanAt
=
now
+
wheel
->
resolution
;
wheel
->
index
=
0
;
wheel
->
slots
=
(
tmr_obj_t
**
)
calloc
(
wheel
->
size
,
sizeof
(
tmr_obj_t
*
));
if
(
wheel
->
slots
==
NULL
)
{
tmrError
(
"failed to allocate wheel slots"
);
pthread_mutex_unlock
(
&
wheel
->
mutex
);
return
;
}
timerMap
.
size
+=
wheel
->
size
;
pthread_mutex_unlock
(
&
wheel
->
mutex
);
}
timerMap
.
count
=
0
;
...
...
src/util/src/tutil.c
浏览文件 @
febd427b
...
...
@@ -55,6 +55,25 @@ uint32_t taosRand(void)
*/
return
rand
();
}
uint32_t
trand
(
void
)
{
int
fd
;
int
seed
;
fd
=
open
(
"/dev/urandom"
,
0
);
if
(
fd
<
0
)
{
seed
=
time
(
0
);
}
else
{
int
len
=
read
(
fd
,
&
seed
,
sizeof
(
seed
));
if
(
len
<
0
)
{
seed
=
time
(
0
);
}
close
(
fd
);
}
return
(
uint32_t
)
seed
;
}
#endif
size_t
twcslen
(
const
wchar_t
*
wcs
)
{
...
...
tests/pytest/stream/stream1.py
浏览文件 @
febd427b
...
...
@@ -58,9 +58,13 @@ class TDTestCase:
tdLog
.
info
(
"sleeping 120 seconds"
)
time
.
sleep
(
120
)
tdSql
.
query
(
"select * from s0"
)
tdSql
.
checkData
(
0
,
1
,
rowNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
)
try
:
tdSql
.
checkData
(
0
,
1
,
rowNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
)
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
tdLog
.
info
(
"===== step4 ====="
)
tdSql
.
execute
(
"drop table s0"
)
...
...
@@ -82,9 +86,12 @@ class TDTestCase:
time
.
sleep
(
120
)
tdSql
.
query
(
"select * from s0"
)
tdSql
.
checkData
(
0
,
1
,
rowNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
)
try
:
tdSql
.
checkData
(
0
,
1
,
rowNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
)
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
tdLog
.
info
(
"===== step8 ====="
)
tdSql
.
query
(
...
...
@@ -105,9 +112,12 @@ class TDTestCase:
time
.
sleep
(
120
)
tdSql
.
query
(
"select * from s1"
)
tdSql
.
checkData
(
0
,
1
,
rowNum
*
tbNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
*
tbNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
*
tbNum
)
try
:
tdSql
.
checkData
(
0
,
1
,
rowNum
*
tbNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
*
tbNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
*
tbNum
)
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
tdLog
.
info
(
"===== step10 ====="
)
tdSql
.
execute
(
"drop table s1"
)
...
...
@@ -127,9 +137,12 @@ class TDTestCase:
tdLog
.
info
(
"sleeping 120 seconds"
)
time
.
sleep
(
120
)
tdSql
.
query
(
"select * from s1"
)
tdSql
.
checkData
(
0
,
1
,
rowNum
*
tbNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
*
tbNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
*
tbNum
)
try
:
tdSql
.
checkData
(
0
,
1
,
rowNum
*
tbNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
*
tbNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
*
tbNum
)
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/pytest/stream/stream2.py
浏览文件 @
febd427b
...
...
@@ -55,12 +55,18 @@ class TDTestCase:
tdLog
.
info
(
"===== step3 ====="
)
time
.
sleep
(
120
)
tdSql
.
query
(
"select * from s0"
)
tdSql
.
checkData
(
0
,
1
,
rowNum
)
try
:
tdSql
.
checkData
(
0
,
1
,
rowNum
)
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
tdLog
.
info
(
"===== step4 ====="
)
tdSql
.
execute
(
"drop table s0"
)
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
tbNum
)
try
:
tdSql
.
checkRows
(
tbNum
)
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
tdLog
.
info
(
"===== step5 ====="
)
tdSql
.
error
(
"select * from s0"
)
...
...
@@ -69,21 +75,30 @@ class TDTestCase:
tdSql
.
execute
(
"create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)"
)
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
tbNum
+
1
)
try
:
tdSql
.
checkRows
(
tbNum
+
1
)
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
tdLog
.
info
(
"===== step7 ====="
)
time
.
sleep
(
120
)
tdSql
.
query
(
"select * from s0"
)
tdSql
.
checkData
(
0
,
1
,
rowNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
)
try
:
tdSql
.
checkData
(
0
,
1
,
rowNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
)
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
tdLog
.
info
(
"===== step8 ====="
)
tdSql
.
query
(
"select count(*), count(col1), count(col2) from stb0 interval(1d)"
)
tdSql
.
checkData
(
0
,
1
,
totalNum
)
tdSql
.
checkData
(
0
,
2
,
totalNum
)
tdSql
.
checkData
(
0
,
3
,
totalNum
)
try
:
tdSql
.
checkData
(
0
,
1
,
totalNum
)
tdSql
.
checkData
(
0
,
2
,
totalNum
)
tdSql
.
checkData
(
0
,
3
,
totalNum
)
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
tbNum
+
1
)
tdSql
.
execute
(
...
...
@@ -94,14 +109,20 @@ class TDTestCase:
tdLog
.
info
(
"===== step9 ====="
)
time
.
sleep
(
120
)
tdSql
.
query
(
"select * from s1"
)
tdSql
.
checkData
(
0
,
1
,
totalNum
)
tdSql
.
checkData
(
0
,
2
,
totalNum
)
tdSql
.
checkData
(
0
,
3
,
totalNum
)
try
:
tdSql
.
checkData
(
0
,
1
,
totalNum
)
tdSql
.
checkData
(
0
,
2
,
totalNum
)
tdSql
.
checkData
(
0
,
3
,
totalNum
)
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
tdLog
.
info
(
"===== step10 ====="
)
tdSql
.
execute
(
"drop table s1"
)
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
tbNum
+
1
)
try
:
tdSql
.
checkRows
(
tbNum
+
1
)
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
tdLog
.
info
(
"===== step11 ====="
)
tdSql
.
error
(
"select * from s1"
)
...
...
@@ -110,14 +131,20 @@ class TDTestCase:
tdSql
.
execute
(
"create table s1 as select count(col1) from stb0 interval(1d)"
)
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
tbNum
+
2
)
try
:
tdSql
.
checkRows
(
tbNum
+
2
)
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
tdLog
.
info
(
"===== step13 ====="
)
time
.
sleep
(
120
)
tdSql
.
query
(
"select * from s1"
)
tdSql
.
checkData
(
0
,
1
,
totalNum
)
#tdSql.checkData(0, 2, None)
#tdSql.checkData(0, 3, None)
try
:
tdSql
.
checkData
(
0
,
1
,
totalNum
)
#tdSql.checkData(0, 2, None)
#tdSql.checkData(0, 3, None)
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/pytest/table/boundary.py
浏览文件 @
febd427b
...
...
@@ -141,7 +141,7 @@ class TDTestCase:
tdSql
.
prepare
()
# 8 bytes for timestamp
maxRowSize
=
65535
-
8
maxRowSize
=
self
.
getLimitFromSourceCode
(
'TSDB_MAX_BYTES_PER_ROW'
)
-
8
maxCols
=
self
.
getLimitFromSourceCode
(
'TSDB_MAX_COLUMNS'
)
-
1
# for binary cols, 2 bytes are used for length
...
...
tests/script/general/parser/commit.sim
浏览文件 @
febd427b
...
...
@@ -68,6 +68,7 @@ while $loop <= $loops
while $i < 10
sql select count(*) from $stb where t1 = $i
if $data00 != $rowNum then
print expect $rowNum, actual: $data00
return -1
endi
$i = $i + 1
...
...
tests/script/general/parser/selectResNum.sim
浏览文件 @
febd427b
...
...
@@ -172,6 +172,7 @@ while $loop <= $loops
endi
sql select c8 from $stb where t1 = $i
if $rows != $rowNum then
print expect $rowNum, actual: $rows
return -1
endi
sql select c9 from $stb where t1 = $i
...
...
tests/script/general/stable/refcount.sim
0 → 100644
浏览文件 @
febd427b
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
print =============== step1
sql create database d1;
sql use d1;
sql create table d1.t1 (ts timestamp, i int);
sql create table d1.t2 (ts timestamp, i int);
sql create table d1.t3 (ts timestamp, i int);
sql insert into d1.t1 values(now, 1);
sql insert into d1.t2 values(now, 1);
sql drop table d1.t1;
sql drop database d1;
sql show databases;
if $rows != 0 then
return -1
endi
print =============== step2
sql create database d2;
sql use d2;
sql create table d2.t1 (ts timestamp, i int);
sql create table d2.t2 (ts timestamp, i int);
sql create table d2.t3 (ts timestamp, i int);
sql insert into d2.t1 values(now, 1);
sql insert into d2.t2 values(now, 1);
sql drop table d2.t1;
sql drop table d2.t2;
sql drop table d2.t3;
sql show d2.tables;
if $rows != 0 then
return -1
endi
sql show d2.vgroups;
if $rows != 0 then
return -1
endi
sql drop database d2;
sql show databases;
if $rows != 0 then
return -1
endi
print =============== step3
sql create database d3;
sql use d3;
sql create table d3.st (ts timestamp, i int) tags (j int);
sql create table d3.t1 using d3.st tags(1);
sql create table d3.t2 using d3.st tags(1);
sql create table d3.t3 using d3.st tags(1);
sql insert into d3.t1 values(now, 1);
sql drop table d3.t1;
sql drop table d3.t2;
sql drop table d3.t3;
sql show d3.tables;
if $rows != 0 then
return -1
endi
sql show d3.vgroups;
if $rows != 0 then
return -1
endi
sql drop database d3;
sql show databases;
if $rows != 0 then
return -1
endi
print =============== step4
sql create database d4;
sql use d4;
sql create table d4.st (ts timestamp, i int) tags (j int);
sql create table d4.t1 using d4.st tags(1);
sql create table d4.t2 using d4.st tags(1);
sql create table d4.t3 using d4.st tags(1);
sql insert into d4.t1 values(now, 1);
sql drop table d4.t1;
sql drop table d4.st;
sql show d4.tables;
if $rows != 0 then
return -1
endi
sql show d4.stables;
if $rows != 0 then
return -1
endi
sql drop database d4;
sql show databases;
if $rows != 0 then
return -1
endi
print =============== step5
sql create database d5;
sql use d5;
sql create table d5.st (ts timestamp, i int) tags (j int);
sql create table d5.t1 using d5.st tags(1);
sql create table d5.t2 using d5.st tags(1);
sql create table d5.t3 using d5.st tags(1);
sql insert into d5.t1 values(now, 1);
sql drop table d5.t1;
sql drop database d5;
sql show databases;
if $rows != 0 then
return -1
endi
print =============== step6
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
tests/script/unique/cluster/balance2.sim
浏览文件 @
febd427b
...
...
@@ -328,7 +328,7 @@ $x = 0
show6:
$x = $x + 1
sleep 2000
if $x ==
3
0 then
if $x ==
1
0 then
return -1
endi
sql show dnodes -x show6
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录