Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
21658047
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
21658047
编写于
6月 03, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feat/row_refact
上级
b8c727ac
2b630fef
变更
39
展开全部
隐藏空白更改
内联
并排
Showing
39 changed file
with
2249 addition
and
356 deletion
+2249
-356
include/client/taos.h
include/client/taos.h
+0
-1
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+1
-3
include/util/tutil.h
include/util/tutil.h
+3
-1
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+16
-4
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+6
-3
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+150
-91
source/client/src/clientMain.c
source/client/src/clientMain.c
+205
-16
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+42
-17
source/client/src/clientSml.c
source/client/src/clientSml.c
+2
-2
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+2
-3
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+74
-14
source/client/test/smlTest.cpp
source/client/test/smlTest.cpp
+15
-15
source/common/src/systable.c
source/common/src/systable.c
+1
-1
source/dnode/mnode/impl/src/mndFunc.c
source/dnode/mnode/impl/src/mndFunc.c
+1
-1
source/dnode/mnode/impl/test/acct/CMakeLists.txt
source/dnode/mnode/impl/test/acct/CMakeLists.txt
+4
-6
source/dnode/mnode/impl/test/func/CMakeLists.txt
source/dnode/mnode/impl/test/func/CMakeLists.txt
+2
-2
source/dnode/mnode/impl/test/profile/CMakeLists.txt
source/dnode/mnode/impl/test/profile/CMakeLists.txt
+2
-2
source/dnode/mnode/impl/test/show/CMakeLists.txt
source/dnode/mnode/impl/test/show/CMakeLists.txt
+2
-2
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+1
-1
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+9
-4
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+19
-33
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+6
-5
source/libs/index/test/CMakeLists.txt
source/libs/index/test/CMakeLists.txt
+8
-10
source/libs/parser/src/parAstParser.c
source/libs/parser/src/parAstParser.c
+4
-0
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+0
-4
source/libs/parser/test/parSelectTest.cpp
source/libs/parser/test/parSelectTest.cpp
+2
-0
source/libs/parser/test/parTestMain.cpp
source/libs/parser/test/parTestMain.cpp
+1
-0
source/libs/planner/test/CMakeLists.txt
source/libs/planner/test/CMakeLists.txt
+2
-2
source/libs/planner/test/planTestMain.cpp
source/libs/planner/test/planTestMain.cpp
+2
-0
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+1
-1
source/libs/scheduler/src/schDbg.c
source/libs/scheduler/src/schDbg.c
+0
-9
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+4
-5
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+23
-17
tests/pytest/util/dnodes.py
tests/pytest/util/dnodes.py
+1
-1
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-0
tests/script/tsim/sync/3Replica5VgElect.sim
tests/script/tsim/sync/3Replica5VgElect.sim
+1
-0
tests/script/tsim/sync/3Replica5VgElect3mnode.sim
tests/script/tsim/sync/3Replica5VgElect3mnode.sim
+906
-0
tests/script/tsim/sync/3Replica5VgElect3mnodedrop.sim
tests/script/tsim/sync/3Replica5VgElect3mnodedrop.sim
+627
-0
tests/script/tsim/testsuit.sim
tests/script/tsim/testsuit.sim
+103
-80
未找到文件。
include/client/taos.h
浏览文件 @
21658047
...
...
@@ -162,7 +162,6 @@ DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT
int
taos_stmt_affected_rows_once
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
TAOS_RES
*
taos_query
(
TAOS
*
taos
,
const
char
*
sql
);
DLL_EXPORT
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
);
DLL_EXPORT
TAOS_ROW
taos_fetch_row
(
TAOS_RES
*
res
);
DLL_EXPORT
int
taos_result_precision
(
TAOS_RES
*
res
);
// get the time precision of result
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
21658047
...
...
@@ -100,7 +100,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
*/
int32_t
schedulerFetchRows
(
int64_t
job
,
void
**
data
);
int32_t
schedulerAsyncFetchRows
(
int64_t
job
,
schedulerFetchCallback
fp
,
void
*
param
);
void
schedulerAsyncFetchRows
(
int64_t
job
,
schedulerFetchCallback
fp
,
void
*
param
);
int32_t
schedulerGetTasksStatus
(
int64_t
job
,
SArray
*
pSub
);
...
...
@@ -121,8 +121,6 @@ void schedulerFreeJob(int64_t job);
void
schedulerDestroy
(
void
);
void
schdExecCallback
(
SQueryResult
*
pResult
,
void
*
param
,
int32_t
code
);
void
schdFetchCallback
(
void
*
pResult
,
void
*
param
,
int32_t
code
);
#ifdef __cplusplus
}
...
...
include/util/tutil.h
浏览文件 @
21658047
...
...
@@ -57,11 +57,13 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar
tMD5Init
(
&
context
);
tMD5Update
(
&
context
,
inBuf
,
(
uint32_t
)
len
);
tMD5Final
(
&
context
);
char
buf
[
TSDB_PASSWORD_LEN
+
1
];
sprintf
(
target
,
"%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x"
,
context
.
digest
[
0
],
sprintf
(
buf
,
"%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x"
,
context
.
digest
[
0
],
context
.
digest
[
1
],
context
.
digest
[
2
],
context
.
digest
[
3
],
context
.
digest
[
4
],
context
.
digest
[
5
],
context
.
digest
[
6
],
context
.
digest
[
7
],
context
.
digest
[
8
],
context
.
digest
[
9
],
context
.
digest
[
10
],
context
.
digest
[
11
],
context
.
digest
[
12
],
context
.
digest
[
13
],
context
.
digest
[
14
],
context
.
digest
[
15
]);
memcpy
(
target
,
buf
,
TSDB_PASSWORD_LEN
);
}
#ifdef __cplusplus
...
...
source/client/inc/clientInt.h
浏览文件 @
21658047
...
...
@@ -46,6 +46,8 @@ extern "C" {
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
#define HEARTBEAT_INTERVAL 1500 // ms
#define SYNC_ON_TOP_OF_ASYNC 0
enum
{
RES_TYPE__QUERY
=
1
,
RES_TYPE__TMQ
,
...
...
@@ -183,7 +185,9 @@ typedef struct SReqResultInfo {
typedef
struct
SRequestSendRecvBody
{
tsem_t
rspSem
;
// not used now
void
*
fp
;
__taos_async_fn_t
queryFp
;
__taos_async_fn_t
fetchFp
;
void
*
param
;
SDataBuf
requestMsg
;
int64_t
queryJob
;
// query job, created according to sql query DAG.
struct
SQueryPlan
*
pDag
;
// the query dag, generated according to the sql statement.
...
...
@@ -219,13 +223,21 @@ typedef struct SRequestObj {
SRequestSendRecvBody
body
;
}
SRequestObj
;
typedef
struct
SSyncQueryParam
{
tsem_t
sem
;
SRequestObj
*
pRequest
;
}
SSyncQueryParam
;
void
*
doAsyncFetchRow
(
SRequestObj
*
pRequest
,
bool
setupOneRowPtr
,
bool
convertUcs4
);
void
*
doFetchRows
(
SRequestObj
*
pRequest
,
bool
setupOneRowPtr
,
bool
convertUcs4
);
void
doSetOneRowPtr
(
SReqResultInfo
*
pResultInfo
);
void
setResPrecision
(
SReqResultInfo
*
pResInfo
,
int32_t
precision
);
int32_t
setQueryResultFromRsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
,
bool
convertUcs4
,
bool
freeAfterUse
);
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SSchema
*
pSchema
,
int32_t
numOfCols
);
void
doFreeReqResultInfo
(
SReqResultInfo
*
pResInfo
);
SRequestObj
*
execQuery
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
);
static
FORCE_INLINE
SReqResultInfo
*
tmqGetCurResInfo
(
TAOS_RES
*
res
)
{
SMqRspObj
*
msg
=
(
SMqRspObj
*
)
res
;
...
...
@@ -271,7 +283,7 @@ int32_t releaseTscObj(int64_t rid);
uint64_t
generateRequestId
();
void
*
createRequest
(
STscObj
*
pObj
,
__taos_async_fn_t
fp
,
void
*
param
,
int32_t
type
);
void
*
createRequest
(
STscObj
*
pObj
,
void
*
param
,
int32_t
type
);
void
destroyRequest
(
SRequestObj
*
pRequest
);
SRequestObj
*
acquireRequest
(
int64_t
rid
);
int32_t
releaseRequest
(
int64_t
rid
);
...
...
@@ -319,9 +331,9 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v
// --- mq
void
hbMgrInitMqHbRspHandle
();
SRequestObj
*
launchQueryImpl
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
int32_t
code
,
bool
keepQuery
,
void
**
res
);
int32_t
getQueryPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SArray
**
pNodeList
);
SRequestObj
*
launchQueryImpl
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
bool
keepQuery
,
void
**
res
);
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryPlan
*
pDag
,
SArray
*
pNodeList
);
void
launchAsyncQuery
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
);
int32_t
refreshMeta
(
STscObj
*
pTscObj
,
SRequestObj
*
pRequest
);
int32_t
updateQnodeList
(
SAppInstInfo
*
pInfo
,
SArray
*
pNodeList
);
...
...
source/client/src/clientEnv.c
浏览文件 @
21658047
...
...
@@ -13,10 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "catalog.h"
#include "functionMgt.h"
#include "clientInt.h"
#include "clientLog.h"
#include "os.h"
#include "query.h"
#include "scheduler.h"
#include "tcache.h"
...
...
@@ -171,7 +172,7 @@ STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientCon
int32_t
releaseTscObj
(
int64_t
rid
)
{
return
taosReleaseRef
(
clientConnRefPool
,
rid
);
}
void
*
createRequest
(
STscObj
*
pObj
,
__taos_async_fn_t
fp
,
void
*
param
,
int32_t
type
)
{
void
*
createRequest
(
STscObj
*
pObj
,
void
*
param
,
int32_t
type
)
{
assert
(
pObj
!=
NULL
);
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SRequestObj
));
...
...
@@ -185,9 +186,10 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
pRequest
->
requestId
=
generateRequestId
();
pRequest
->
metric
.
start
=
taosGetTimestampUs
();
pRequest
->
body
.
param
=
param
;
pRequest
->
type
=
type
;
pRequest
->
pTscObj
=
pObj
;
pRequest
->
body
.
fp
=
fp
;
// not used it yet
pRequest
->
msgBuf
=
taosMemoryCalloc
(
1
,
ERROR_MSG_BUF_DEFAULT_SIZE
);
pRequest
->
msgBufLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
;
tsem_init
(
&
pRequest
->
body
.
rspSem
,
0
,
0
);
...
...
@@ -288,6 +290,7 @@ void taos_init_imp(void) {
taosSetCoreDump
(
true
);
initTaskQueue
();
fmFuncMgtInit
();
clientConnRefPool
=
taosOpenRef
(
200
,
destroyTscObj
);
clientReqRefPool
=
taosOpenRef
(
40960
,
doDestroyRequest
);
...
...
source/client/src/clientImpl.c
浏览文件 @
21658047
...
...
@@ -133,7 +133,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
}
int32_t
buildRequest
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
,
SRequestObj
**
pRequest
)
{
*
pRequest
=
createRequest
(
pTscObj
,
NULL
,
NULL
,
TSDB_SQL_SELECT
);
*
pRequest
=
createRequest
(
pTscObj
,
NULL
,
TSDB_SQL_SELECT
);
if
(
*
pRequest
==
NULL
)
{
tscError
(
"failed to malloc sqlObj"
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -192,6 +192,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
setResPrecision
(
&
pRequest
->
body
.
resInfo
,
(
*
pQuery
)
->
precision
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
||
NEED_CLIENT_HANDLE_ERROR
(
code
))
{
TSWAP
(
pRequest
->
dbList
,
(
*
pQuery
)
->
pDbList
);
TSWAP
(
pRequest
->
tableList
,
(
*
pQuery
)
->
pTableList
);
...
...
@@ -230,6 +231,29 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
return
TSDB_CODE_SUCCESS
;
}
static
SAppInstInfo
*
getAppInfo
(
SRequestObj
*
pRequest
)
{
return
pRequest
->
pTscObj
->
pAppInfo
;
}
int32_t
asyncExecDdlQuery
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
)
{
// drop table if exists not_exists_table
if
(
NULL
==
pQuery
->
pCmdMsg
)
{
return
TSDB_CODE_SUCCESS
;
}
SCmdMsgInfo
*
pMsgInfo
=
pQuery
->
pCmdMsg
;
pRequest
->
type
=
pMsgInfo
->
msgType
;
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){.
pData
=
pMsgInfo
->
pMsg
,
.
len
=
pMsgInfo
->
msgLen
,
.
handle
=
NULL
};
pMsgInfo
->
pMsg
=
NULL
;
// pMsg transferred to SMsgSendInfo management
SAppInstInfo
*
pAppInfo
=
getAppInfo
(
pRequest
);
SMsgSendInfo
*
pSendMsg
=
buildMsgInfoImpl
(
pRequest
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
pAppInfo
->
pTransporter
,
&
pMsgInfo
->
epSet
,
&
transporterId
,
pSendMsg
);
return
TSDB_CODE_SUCCESS
;
}
int
compareQueryNodeLoad
(
const
void
*
elem1
,
const
void
*
elem2
)
{
SQueryNodeLoad
*
node1
=
(
SQueryNodeLoad
*
)
elem1
;
SQueryNodeLoad
*
node2
=
(
SQueryNodeLoad
*
)
elem2
;
...
...
@@ -286,9 +310,11 @@ int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pPlan
,
SArray
**
pNodeList
)
{
pRequest
->
type
=
pQuery
->
msgType
;
SAppInstInfo
*
pAppInfo
=
getAppInfo
(
pRequest
);
SPlanContext
cxt
=
{.
queryId
=
pRequest
->
requestId
,
.
acctId
=
pRequest
->
pTscObj
->
acctId
,
.
mgmtEpSet
=
getEpSet_s
(
&
p
Request
->
pTscObj
->
p
AppInfo
->
mgmtEp
),
.
mgmtEpSet
=
getEpSet_s
(
&
pAppInfo
->
mgmtEp
),
.
pAstRoot
=
pQuery
->
pRoot
,
.
showRewrite
=
pQuery
->
showRewrite
,
.
pMsg
=
pRequest
->
msgBuf
,
...
...
@@ -298,6 +324,7 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
qCreateQueryPlan
(
&
cxt
,
pPlan
,
*
pNodeList
);
}
return
code
;
}
...
...
@@ -352,7 +379,7 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
pRequest
->
body
.
resInfo
.
execRes
=
res
.
res
;
while
(
true
)
{
while
(
true
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
...
...
@@ -388,7 +415,7 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryPlan
*
pDag
,
SArray
*
pNodeList
)
{
void
*
pTransporter
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
;
SQueryResult
res
=
{
.
code
=
0
,
.
numOfRows
=
0
};
SQueryResult
res
=
{
0
};
int32_t
code
=
schedulerExecJob
(
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
queryJob
,
pRequest
->
sqlstr
,
pRequest
->
metric
.
start
,
&
res
);
...
...
@@ -417,10 +444,6 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
return
pRequest
->
code
;
}
int32_t
getQueryPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SArray
**
pNodeList
)
{
return
getPlan
(
pRequest
,
pQuery
,
&
pRequest
->
body
.
pDag
,
pNodeList
);
}
int32_t
handleSubmitExecRes
(
SRequestObj
*
pRequest
,
void
*
res
,
SCatalog
*
pCatalog
,
SEpSet
*
epset
)
{
int32_t
code
=
0
;
SArray
*
pArray
=
NULL
;
...
...
@@ -428,19 +451,19 @@ int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog
if
(
pRsp
->
nBlocks
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
}
pArray
=
taosArrayInit
(
pRsp
->
nBlocks
,
sizeof
(
STbSVersion
));
if
(
NULL
==
pArray
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
nBlocks
;
++
i
)
{
SSubmitBlkRsp
*
blk
=
pRsp
->
pBlocks
+
i
;
if
(
NULL
==
blk
->
tblFName
||
0
==
blk
->
tblFName
[
0
])
{
continue
;
}
STbSVersion
tbSver
=
{.
tbFName
=
blk
->
tblFName
,
.
sver
=
blk
->
sver
};
taosArrayPush
(
pArray
,
&
tbSver
);
}
...
...
@@ -450,7 +473,7 @@ int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog
_return:
taosArrayDestroy
(
pArray
);
return
code
;
return
code
;
}
int32_t
handleQueryExecRes
(
SRequestObj
*
pRequest
,
void
*
res
,
SCatalog
*
pCatalog
,
SEpSet
*
epset
)
{
...
...
@@ -461,13 +484,13 @@ int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog,
if
(
tbNum
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
}
pArray
=
taosArrayInit
(
tbNum
,
sizeof
(
STbSVersion
));
if
(
NULL
==
pArray
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
for
(
int32_t
i
=
0
;
i
<
tbNum
;
++
i
)
{
STbVerInfo
*
tbInfo
=
taosArrayGet
(
pTbArray
,
i
);
STbSVersion
tbSver
=
{.
tbFName
=
tbInfo
->
tbFName
,
.
sver
=
tbInfo
->
sversion
,
.
tver
=
tbInfo
->
tversion
};
...
...
@@ -479,7 +502,7 @@ int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog,
_return:
taosArrayDestroy
(
pArray
);
return
code
;
return
code
;
}
int32_t
handleAlterTbExecRes
(
void
*
res
,
SCatalog
*
pCatalog
)
{
...
...
@@ -490,19 +513,19 @@ int32_t handleExecRes(SRequestObj* pRequest) {
if
(
NULL
==
pRequest
->
body
.
resInfo
.
execRes
.
res
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
0
;
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
)
{
return
code
;
}
SEpSet
epset
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
);
SQueryExecRes
*
pRes
=
&
pRequest
->
body
.
resInfo
.
execRes
;
switch
(
pRes
->
msgType
)
{
case
TDMT_VND_ALTER_TABLE
:
case
TDMT_VND_ALTER_TABLE
:
case
TDMT_MND_ALTER_STB
:
{
code
=
handleAlterTbExecRes
(
pRes
->
res
,
pCatalog
);
break
;
...
...
@@ -510,7 +533,7 @@ int32_t handleExecRes(SRequestObj* pRequest) {
case
TDMT_VND_SUBMIT
:
{
code
=
handleSubmitExecRes
(
pRequest
,
pRes
->
res
,
pCatalog
,
&
epset
);
break
;
}
}
case
TDMT_VND_QUERY
:
{
code
=
handleQueryExecRes
(
pRequest
,
pRes
->
res
,
pCatalog
,
&
epset
);
break
;
...
...
@@ -523,30 +546,37 @@ int32_t handleExecRes(SRequestObj* pRequest) {
return
code
;
}
SRequestObj
*
launchQueryImpl
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
int32_t
code
,
bool
keepQuery
,
void
**
res
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
switch
(
pQuery
->
execMode
)
{
case
QUERY_EXEC_MODE_LOCAL
:
code
=
execLocalCmd
(
pRequest
,
pQuery
);
break
;
case
QUERY_EXEC_MODE_RPC
:
code
=
execDdlQuery
(
pRequest
,
pQuery
);
break
;
case
QUERY_EXEC_MODE_SCHEDULE
:
{
SArray
*
pNodeList
=
NULL
;
code
=
getPlan
(
pRequest
,
pQuery
,
&
pRequest
->
body
.
pDag
,
&
pNodeList
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
scheduleQuery
(
pRequest
,
pRequest
->
body
.
pDag
,
pNodeList
);
}
taosArrayDestroy
(
pNodeList
);
break
;
void
schedulerExecCb
(
SQueryResult
*
pResult
,
void
*
param
,
int32_t
code
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
param
;
// return to client
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
}
SRequestObj
*
launchQueryImpl
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
bool
keepQuery
,
void
**
res
)
{
int32_t
code
=
0
;
switch
(
pQuery
->
execMode
)
{
case
QUERY_EXEC_MODE_LOCAL
:
code
=
execLocalCmd
(
pRequest
,
pQuery
);
break
;
case
QUERY_EXEC_MODE_RPC
:
code
=
execDdlQuery
(
pRequest
,
pQuery
);
break
;
case
QUERY_EXEC_MODE_SCHEDULE
:
{
SArray
*
pNodeList
=
NULL
;
code
=
getPlan
(
pRequest
,
pQuery
,
&
pRequest
->
body
.
pDag
,
&
pNodeList
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
scheduleQuery
(
pRequest
,
pRequest
->
body
.
pDag
,
pNodeList
);
}
case
QUERY_EXEC_MODE_EMPTY_RESULT
:
pRequest
->
type
=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
;
break
;
default:
break
;
taosArrayDestroy
(
pNodeList
);
break
;
}
case
QUERY_EXEC_MODE_EMPTY_RESULT
:
pRequest
->
type
=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
;
break
;
default:
break
;
}
if
(
!
keepQuery
)
{
...
...
@@ -583,7 +613,70 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
return
pRequest
;
}
return
launchQueryImpl
(
pRequest
,
pQuery
,
code
,
false
,
NULL
);
return
launchQueryImpl
(
pRequest
,
pQuery
,
false
,
NULL
);
}
void
launchAsyncQuery
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
)
{
void
*
pRes
=
NULL
;
int32_t
code
=
0
;
switch
(
pQuery
->
execMode
)
{
case
QUERY_EXEC_MODE_LOCAL
:
code
=
execLocalCmd
(
pRequest
,
pQuery
);
break
;
case
QUERY_EXEC_MODE_RPC
:
code
=
asyncExecDdlQuery
(
pRequest
,
pQuery
);
break
;
case
QUERY_EXEC_MODE_SCHEDULE
:
{
SArray
*
pNodeList
=
taosArrayInit
(
4
,
sizeof
(
struct
SQueryNodeAddr
));
pRequest
->
type
=
pQuery
->
msgType
;
SPlanContext
cxt
=
{.
queryId
=
pRequest
->
requestId
,
.
acctId
=
pRequest
->
pTscObj
->
acctId
,
.
mgmtEpSet
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
),
.
pAstRoot
=
pQuery
->
pRoot
,
.
showRewrite
=
pQuery
->
showRewrite
,
.
pMsg
=
pRequest
->
msgBuf
,
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
};
SAppInstInfo
*
pAppInfo
=
getAppInfo
(
pRequest
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
qCreateQueryPlan
(
&
cxt
,
&
pRequest
->
body
.
pDag
,
pNodeList
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
schedulerAsyncExecJob
(
pAppInfo
->
pTransporter
,
pNodeList
,
pRequest
->
body
.
pDag
,
&
pRequest
->
body
.
queryJob
,
pRequest
->
sqlstr
,
pRequest
->
metric
.
start
,
schedulerExecCb
,
pRequest
);
// if (NULL != pRes) {
// code = validateSversion(pRequest, pRes);
// }
}
taosArrayDestroy
(
pNodeList
);
break
;
}
case
QUERY_EXEC_MODE_EMPTY_RESULT
:
pRequest
->
type
=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
;
break
;
default:
break
;
}
// if (!keepQuery) {
// qDestroyQuery(pQuery);
// }
if
(
NULL
!=
pRequest
&&
TSDB_CODE_SUCCESS
!=
code
)
{
pRequest
->
code
=
terrno
;
}
// if (res) {
// *res = pRes;
// } else {
// freeRequestRes(pRequest, pRes);
// pRes = NULL;
}
int32_t
refreshMeta
(
STscObj
*
pTscObj
,
SRequestObj
*
pRequest
)
{
...
...
@@ -666,17 +759,6 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
return
pRequest
;
}
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
if
(
sqlLen
>
(
size_t
)
TSDB_MAX_ALLOWED_SQL_LEN
)
{
tscError
(
"sql string exceeds max length:%d"
,
TSDB_MAX_ALLOWED_SQL_LEN
);
terrno
=
TSDB_CODE_TSC_EXCEED_SQL_LIMIT
;
return
NULL
;
}
return
execQuery
(
pTscObj
,
sql
,
sqlLen
);
}
int
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
)
{
pEpSet
->
version
=
0
;
...
...
@@ -726,7 +808,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return
pTscObj
;
}
SRequestObj
*
pRequest
=
createRequest
(
pTscObj
,
fp
,
param
,
TDMT_MND_CONNECT
);
SRequestObj
*
pRequest
=
createRequest
(
pTscObj
,
param
,
TDMT_MND_CONNECT
);
if
(
pRequest
==
NULL
)
{
destroyTscObj
(
pTscObj
);
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -843,8 +925,7 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SMsgSendInfo
*
pSendInfo
=
(
SMsgSendInfo
*
)
pMsg
->
info
.
ahandle
;
assert
(
pMsg
->
info
.
ahandle
!=
NULL
);
SRequestObj
*
pRequest
=
NULL
;
STscObj
*
pTscObj
=
NULL
;
STscObj
*
pTscObj
=
NULL
;
if
(
pSendInfo
->
requestObjRefId
!=
0
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
taosAcquireRef
(
clientReqRefPool
,
pSendInfo
->
requestObjRefId
);
...
...
@@ -944,7 +1025,7 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
}
}
void
*
do
Async
FetchRows
(
SRequestObj
*
pRequest
,
bool
setupOneRowPtr
,
bool
convertUcs4
)
{
void
*
doFetchRows
(
SRequestObj
*
pRequest
,
bool
setupOneRowPtr
,
bool
convertUcs4
)
{
assert
(
pRequest
!=
NULL
);
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
...
...
@@ -955,17 +1036,8 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
return
NULL
;
}
tsem_init
(
&
schdRspSem
,
0
,
0
);
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
SSchdFetchParam
param
=
{.
pData
=
(
void
**
)
&
pResInfo
->
pData
,
.
code
=
&
pRequest
->
code
};
pRequest
->
code
=
schedulerAsyncFetchRows
(
pRequest
->
body
.
queryJob
,
schdFetchCallback
,
&
param
);
if
(
pRequest
->
code
!=
TSDB_CODE_SUCCESS
)
{
pResultInfo
->
numOfRows
=
0
;
return
NULL
;
}
tsem_wait
(
&
schdRspSem
);
pRequest
->
code
=
schedulerFetchRows
(
pRequest
->
body
.
queryJob
,
(
void
**
)
&
pResInfo
->
pData
);
if
(
pRequest
->
code
!=
TSDB_CODE_SUCCESS
)
{
pResultInfo
->
numOfRows
=
0
;
return
NULL
;
...
...
@@ -994,8 +1066,12 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
return
pResultInfo
->
row
;
}
void
*
doFetchRows
(
SRequestObj
*
pRequest
,
bool
setupOneRowPtr
,
bool
convertUcs4
)
{
// return doAsyncFetchRows(pRequest, setupOneRowPtr, convertUcs4);
static
void
syncFetchFn
(
void
*
param
,
TAOS_RES
*
res
,
int32_t
numOfRows
)
{
SSyncQueryParam
*
pParam
=
param
;
tsem_post
(
&
pParam
->
sem
);
}
void
*
doAsyncFetchRow
(
SRequestObj
*
pRequest
,
bool
setupOneRowPtr
,
bool
convertUcs4
)
{
assert
(
pRequest
!=
NULL
);
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
...
...
@@ -1006,29 +1082,12 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
return
NULL
;
}
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
pRequest
->
code
=
schedulerFetchRows
(
pRequest
->
body
.
queryJob
,
(
void
**
)
&
pResInfo
->
pData
);
if
(
pRequest
->
code
!=
TSDB_CODE_SUCCESS
)
{
pResultInfo
->
numOfRows
=
0
;
return
NULL
;
}
pRequest
->
code
=
setQueryResultFromRsp
(
&
pRequest
->
body
.
resInfo
,
(
SRetrieveTableRsp
*
)
pResInfo
->
pData
,
convertUcs4
,
true
);
if
(
pRequest
->
code
!=
TSDB_CODE_SUCCESS
)
{
pResultInfo
->
numOfRows
=
0
;
return
NULL
;
}
tscDebug
(
"0x%"
PRIx64
" fetch results, numOfRows:%d total Rows:%"
PRId64
", complete:%d, reqId:0x%"
PRIx64
,
pRequest
->
self
,
pResInfo
->
numOfRows
,
pResInfo
->
totalRows
,
pResInfo
->
completed
,
pRequest
->
requestId
);
if
(
pResultInfo
->
numOfRows
==
0
)
{
return
NULL
;
}
SSyncQueryParam
*
pParam
=
pRequest
->
body
.
param
;
taos_fetch_rows_a
(
pRequest
,
syncFetchFn
,
pParam
);
tsem_wait
(
&
pParam
->
sem
);
}
if
(
setupOneRowPtr
)
{
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
&&
setupOneRowPtr
)
{
doSetOneRowPtr
(
pResultInfo
);
pResultInfo
->
current
+=
1
;
}
...
...
source/client/src/clientMain.c
浏览文件 @
21658047
...
...
@@ -175,12 +175,39 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
return
pResInfo
->
userFields
;
}
static
void
syncQueryFn
(
void
*
param
,
void
*
res
,
int32_t
code
)
{
SSyncQueryParam
*
pParam
=
param
;
pParam
->
pRequest
=
res
;
pParam
->
pRequest
->
code
=
code
;
tsem_post
(
&
pParam
->
sem
);
}
TAOS_RES
*
taos_query
(
TAOS
*
taos
,
const
char
*
sql
)
{
if
(
taos
==
NULL
||
sql
==
NULL
)
{
return
NULL
;
}
return
taos_query_l
(
taos
,
sql
,
(
int32_t
)
strlen
(
sql
));
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
#if SYNC_ON_TOP_OF_ASYNC
SSyncQueryParam
*
param
=
taosMemoryCalloc
(
1
,
sizeof
(
struct
SSyncQueryParam
));
tsem_init
(
&
param
->
sem
,
0
,
0
);
taos_query_a
(
pTscObj
,
sql
,
syncQueryFn
,
param
);
tsem_wait
(
&
param
->
sem
);
return
param
->
pRequest
;
#else
size_t
sqlLen
=
strlen
(
sql
);
if
(
sqlLen
>
(
size_t
)
TSDB_MAX_ALLOWED_SQL_LEN
)
{
tscError
(
"sql string exceeds max length:%d"
,
TSDB_MAX_ALLOWED_SQL_LEN
);
terrno
=
TSDB_CODE_TSC_EXCEED_SQL_LIMIT
;
return
NULL
;
}
return
execQuery
(
pTscObj
,
sql
,
sqlLen
);
#endif
}
TAOS_ROW
taos_fetch_row
(
TAOS_RES
*
res
)
{
...
...
@@ -195,7 +222,11 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return
NULL
;
}
#if SYNC_ON_TOP_OF_ASYNC
return
doAsyncFetchRow
(
pRequest
,
true
,
true
);
#else
return
doFetchRows
(
pRequest
,
true
,
true
);
#endif
}
else
if
(
TD_RES_TMQ
(
res
))
{
SMqRspObj
*
msg
=
((
SMqRspObj
*
)
res
);
...
...
@@ -205,6 +236,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
}
else
{
pResultInfo
=
tmqGetCurResInfo
(
res
);
}
if
(
pResultInfo
->
current
<
pResultInfo
->
numOfRows
)
{
doSetOneRowPtr
(
pResultInfo
);
pResultInfo
->
current
+=
1
;
...
...
@@ -563,38 +595,195 @@ const char *taos_get_server_info(TAOS *taos) {
return
pTscObj
->
ver
;
}
typedef
struct
SqlParseWrapper
{
SParseContext
*
pCtx
;
SCatalogReq
catalogReq
;
SRequestObj
*
pRequest
;
SQuery
*
pQuery
;
}
SqlParseWrapper
;
void
retrieveMetaCallback
(
SMetaData
*
pResultMeta
,
void
*
param
,
int32_t
code
)
{
SqlParseWrapper
*
pWrapper
=
(
SqlParseWrapper
*
)
param
;
SQuery
*
pQuery
=
pWrapper
->
pQuery
;
SRequestObj
*
pRequest
=
pWrapper
->
pRequest
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
code
=
qAnalyseSqlSemantic
(
pWrapper
->
pCtx
,
&
pWrapper
->
catalogReq
,
pResultMeta
,
pQuery
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
if
(
pQuery
->
haveResultSet
)
{
setResSchemaInfo
(
&
pRequest
->
body
.
resInfo
,
pQuery
->
pResSchema
,
(
pQuery
)
->
numOfResCols
);
setResPrecision
(
&
pRequest
->
body
.
resInfo
,
(
pQuery
)
->
precision
);
}
TSWAP
(
pRequest
->
dbList
,
(
pQuery
)
->
pDbList
);
TSWAP
(
pRequest
->
tableList
,
(
pQuery
)
->
pTableList
);
taosMemoryFree
(
pWrapper
);
launchAsyncQuery
(
pRequest
,
pQuery
);
return
;
_error:
taosMemoryFree
(
pWrapper
);
tscError
(
"0x%"
PRIx64
" error occurs, code:%s, return to user app, reqId:%"
PRIx64
,
pRequest
->
self
,
tstrerror
(
code
),
pRequest
->
requestId
);
pRequest
->
code
=
code
;
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
}
// todo add retry before return user's callback
void
taos_query_a
(
TAOS
*
taos
,
const
char
*
sql
,
__taos_async_fn_t
fp
,
void
*
param
)
{
ASSERT
(
fp
!=
NULL
);
if
(
taos
==
NULL
||
sql
==
NULL
)
{
fp
(
param
,
NULL
,
TSDB_CODE_INVALID_PARA
);
terrno
=
TSDB_CODE_INVALID_PARA
;
fp
(
param
,
NULL
,
terrno
);
return
;
}
SRequestObj
*
pRequest
=
NULL
;
size_t
sqlLen
=
strlen
(
sql
);
if
(
sqlLen
>
(
size_t
)
TSDB_MAX_ALLOWED_SQL_LEN
)
{
tscError
(
"sql string exceeds max length:%d"
,
TSDB_MAX_ALLOWED_SQL_LEN
);
terrno
=
TSDB_CODE_TSC_EXCEED_SQL_LIMIT
;
fp
(
param
,
NULL
,
terrno
);
return
;
}
SRequestObj
*
pRequest
=
NULL
;
int32_t
retryNum
=
0
;
int32_t
code
=
0
;
size_t
sqlLen
=
strlen
(
sql
);
// while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
code
=
buildRequest
(
taos
,
sql
,
sqlLen
,
&
pRequest
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
fp
(
param
,
NULL
,
code
);
return
;
}
while
(
retryNum
++
<
REQUEST_MAX_TRY_TIMES
)
{
pRequest
=
launchQuery
(
taos
,
sql
,
sqlLen
);
if
(
pRequest
==
NULL
||
TSDB_CODE_SUCCESS
==
pRequest
->
code
||
!
NEED_CLIENT_HANDLE_ERROR
(
pRequest
->
code
))
{
break
;
}
pRequest
->
body
.
queryFp
=
fp
;
pRequest
->
body
.
param
=
param
;
code
=
refreshMeta
(
taos
,
pRequest
);
if
(
code
)
{
pRequest
->
code
=
code
;
break
;
}
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
destroyRequest
(
pRequest
);
SParseContext
*
pCxt
=
taosMemoryCalloc
(
1
,
sizeof
(
SParseContext
));
*
pCxt
=
(
SParseContext
){.
requestId
=
pRequest
->
requestId
,
.
acctId
=
pTscObj
->
acctId
,
.
db
=
pRequest
->
pDb
,
.
topicQuery
=
false
,
.
pSql
=
pRequest
->
sqlstr
,
.
sqlLen
=
pRequest
->
sqlLen
,
.
pMsg
=
pRequest
->
msgBuf
,
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
pTransporter
=
pTscObj
->
pAppInfo
->
pTransporter
,
.
pStmtCb
=
NULL
,
.
pUser
=
pTscObj
->
user
,
.
isSuperUser
=
(
0
==
strcmp
(
pTscObj
->
user
,
TSDB_DEFAULT_USER
)),
.
async
=
true
,};
pCxt
->
mgmtEpSet
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
);
code
=
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pCxt
->
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
SQuery
*
pQuery
=
NULL
;
SCatalogReq
catalogReq
=
{
0
};
code
=
qParseSqlSyntax
(
pCxt
,
&
pQuery
,
&
catalogReq
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
SqlParseWrapper
*
pWrapper
=
taosMemoryCalloc
(
1
,
sizeof
(
SqlParseWrapper
));
pWrapper
->
pCtx
=
pCxt
;
pWrapper
->
pQuery
=
pQuery
;
pWrapper
->
pRequest
=
pRequest
;
pWrapper
->
catalogReq
=
catalogReq
;
code
=
catalogAsyncGetAllMeta
(
pCxt
->
pCatalog
,
pCxt
->
pTransporter
,
&
pCxt
->
mgmtEpSet
,
pRequest
->
requestId
,
&
catalogReq
,
retrieveMetaCallback
,
pWrapper
,
&
pRequest
->
body
.
queryJob
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
return
;
// todo handle the retry process
// if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
// TSWAP(pRequest->dbList, (pQuery)->pDbList);
// TSWAP(pRequest->tableList, (pQuery)->pTableList);
// }
_error:
terrno
=
code
;
pRequest
->
code
=
code
;
fp
(
param
,
pRequest
,
code
);
}
static
void
fetchCallback
(
void
*
pResult
,
void
*
param
,
int32_t
code
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
param
;
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
pResultInfo
->
pData
=
pResult
;
pResultInfo
->
numOfRows
=
0
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pRequest
->
code
=
code
;
pRequest
->
body
.
fetchFp
(
pRequest
->
body
.
param
,
pRequest
,
0
);
return
;
}
if
(
pRequest
->
code
!=
TSDB_CODE_SUCCESS
)
{
pRequest
->
code
=
code
;
pRequest
->
body
.
fetchFp
(
pRequest
->
body
.
param
,
pRequest
,
0
);
}
pRequest
->
code
=
setQueryResultFromRsp
(
&
pRequest
->
body
.
resInfo
,
(
SRetrieveTableRsp
*
)
pResultInfo
->
pData
,
true
,
false
);
if
(
pRequest
->
code
!=
TSDB_CODE_SUCCESS
)
{
pResultInfo
->
numOfRows
=
0
;
pRequest
->
code
=
code
;
pRequest
->
body
.
fetchFp
(
pRequest
->
body
.
param
,
pRequest
,
0
);
}
tscDebug
(
"0x%"
PRIx64
" fetch results, numOfRows:%d total Rows:%"
PRId64
", complete:%d, reqId:0x%"
PRIx64
,
pRequest
->
self
,
pResultInfo
->
numOfRows
,
pResultInfo
->
totalRows
,
pResultInfo
->
completed
,
pRequest
->
requestId
);
pRequest
->
body
.
fetchFp
(
pRequest
->
body
.
param
,
pRequest
,
pResultInfo
->
numOfRows
);
}
void
taos_fetch_rows_a
(
TAOS_RES
*
res
,
__taos_async_fn_t
fp
,
void
*
param
)
{
// TODO
ASSERT
(
res
!=
NULL
&&
fp
!=
NULL
);
SRequestObj
*
pRequest
=
res
;
pRequest
->
body
.
fetchFp
=
fp
;
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
if
(
taos_num_fields
(
pRequest
)
==
0
)
{
pResultInfo
->
numOfRows
=
0
;
pRequest
->
body
.
fetchFp
(
param
,
pRequest
,
pResultInfo
->
numOfRows
);
return
;
}
if
(
pResultInfo
->
pData
==
NULL
||
pResultInfo
->
current
>=
pResultInfo
->
numOfRows
)
{
// All data has returned to App already, no need to try again
if
(
pResultInfo
->
completed
)
{
pResultInfo
->
numOfRows
=
0
;
pRequest
->
body
.
fetchFp
(
param
,
pRequest
,
pResultInfo
->
numOfRows
);
return
;
}
}
schedulerAsyncFetchRows
(
pRequest
->
body
.
queryJob
,
fetchCallback
,
pRequest
);
}
TAOS_SUB
*
taos_subscribe
(
TAOS
*
taos
,
int
restart
,
const
char
*
topic
,
const
char
*
sql
,
TAOS_SUBSCRIBE_CALLBACK
fp
,
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
21658047
...
...
@@ -33,7 +33,11 @@ int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
setErrno
(
pRequest
,
code
);
taosMemoryFree
(
pMsg
->
pData
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
if
(
pRequest
->
body
.
queryFp
!=
NULL
)
{
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
}
else
{
tsem_post
(
&
pRequest
->
body
.
rspSem
);
}
return
code
;
}
...
...
@@ -117,7 +121,12 @@ int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
setErrno
(
pRequest
,
code
);
}
tsem_post
(
&
pRequest
->
body
.
rspSem
);
if
(
pRequest
->
body
.
queryFp
)
{
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
}
else
{
tsem_post
(
&
pRequest
->
body
.
rspSem
);
}
return
code
;
}
...
...
@@ -146,7 +155,13 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosMemoryFree
(
pMsg
->
pData
);
setErrno
(
pRequest
,
code
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
if
(
pRequest
->
body
.
queryFp
!=
NULL
)
{
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
pRequest
->
code
);
}
else
{
tsem_post
(
&
pRequest
->
body
.
rspSem
);
}
return
code
;
}
...
...
@@ -185,7 +200,12 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
setConnectionDB
(
pRequest
->
pTscObj
,
db
);
taosMemoryFree
(
pMsg
->
pData
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
if
(
pRequest
->
body
.
queryFp
!=
NULL
)
{
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
pRequest
->
code
);
}
else
{
tsem_post
(
&
pRequest
->
body
.
rspSem
);
}
return
0
;
}
...
...
@@ -196,11 +216,13 @@ int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
taosMemoryFree
(
pMsg
->
pData
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
setErrno
(
pRequest
,
code
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
return
code
;
}
tsem_post
(
&
pRequest
->
body
.
rspSem
);
if
(
pRequest
->
body
.
queryFp
!=
NULL
)
{
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
}
else
{
tsem_post
(
&
pRequest
->
body
.
rspSem
);
}
return
code
;
}
...
...
@@ -208,18 +230,20 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj
*
pRequest
=
param
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
setErrno
(
pRequest
,
code
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
return
code
;
}
SDropDbRsp
dropdbRsp
=
{
0
};
tDeserializeSDropDbRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
dropdbRsp
);
}
else
{
SDropDbRsp
dropdbRsp
=
{
0
};
tDeserializeSDropDbRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
dropdbRsp
);
struct
SCatalog
*
pCatalog
=
NULL
;
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
catalogRemoveDB
(
pCatalog
,
dropdbRsp
.
db
,
dropdbRsp
.
uid
);
struct
SCatalog
*
pCatalog
=
NULL
;
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
catalogRemoveDB
(
pCatalog
,
dropdbRsp
.
db
,
dropdbRsp
.
uid
);
}
tsem_post
(
&
pRequest
->
body
.
rspSem
);
if
(
pRequest
->
body
.
queryFp
!=
NULL
)
{
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
}
else
{
tsem_post
(
&
pRequest
->
body
.
rspSem
);
}
return
code
;
}
...
...
@@ -245,6 +269,7 @@ int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
}
// todo refactor: this arraylist is too large
void
initMsgHandleFp
()
{
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_CONNECT
)]
=
processConnectRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_DB
)]
=
processCreateDbRsp
;
...
...
source/client/src/clientSml.c
浏览文件 @
21658047
...
...
@@ -2241,7 +2241,7 @@ static int32_t smlInsertData(SSmlHandle* info) {
}
info
->
cost
.
insertRpcTime
=
taosGetTimestampUs
();
launchQueryImpl
(
info
->
pRequest
,
info
->
pQuery
,
TSDB_CODE_SUCCESS
,
true
,
NULL
);
launchQueryImpl
(
info
->
pRequest
,
info
->
pQuery
,
true
,
NULL
);
info
->
affectedRows
=
taos_affected_rows
(
info
->
pRequest
);
return
info
->
pRequest
->
code
;
...
...
@@ -2362,7 +2362,7 @@ static int32_t isSchemalessDb(SSmlHandle* info){
*/
TAOS_RES
*
taos_schemaless_insert
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
int
protocol
,
int
precision
)
{
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
if
(
!
request
){
uError
(
"SML:taos_schemaless_insert error request is null"
);
return
NULL
;
...
...
source/client/src/clientStmt.c
浏览文件 @
21658047
...
...
@@ -771,11 +771,10 @@ int stmtExec(TAOS_STMT* stmt) {
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_EXECUTE
));
if
(
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
)
{
launchQueryImpl
(
pStmt
->
exec
.
pRequest
,
pStmt
->
sql
.
pQuery
,
TSDB_CODE_SUCCESS
,
true
,
NULL
);
launchQueryImpl
(
pStmt
->
exec
.
pRequest
,
pStmt
->
sql
.
pQuery
,
true
,
NULL
);
}
else
{
STMT_ERR_RET
(
qBuildStmtOutput
(
pStmt
->
sql
.
pQuery
,
pStmt
->
exec
.
pVgHash
,
pStmt
->
exec
.
pBlockHash
));
launchQueryImpl
(
pStmt
->
exec
.
pRequest
,
pStmt
->
sql
.
pQuery
,
TSDB_CODE_SUCCESS
,
true
,
(
autoCreateTbl
?
(
void
**
)
&
pRsp
:
NULL
));
launchQueryImpl
(
pStmt
->
exec
.
pRequest
,
pStmt
->
sql
.
pQuery
,
true
,
(
autoCreateTbl
?
(
void
**
)
&
pRsp
:
NULL
));
}
if
(
pStmt
->
exec
.
pRequest
->
code
&&
NEED_CLIENT_HANDLE_ERROR
(
pStmt
->
exec
.
pRequest
->
code
))
{
...
...
source/client/test/clientTests.cpp
浏览文件 @
21658047
...
...
@@ -13,10 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#include <gtest/gtest.h>
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "clientInt.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
...
...
@@ -24,7 +26,6 @@
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "../inc/clientInt.h"
#include "taos.h"
namespace
{
...
...
@@ -41,6 +42,47 @@ void showDB(TAOS* pConn) {
printf
(
"%s
\n
"
,
str
);
}
}
void
fetchCallback
(
void
*
param
,
void
*
res
,
int32_t
numOfRow
)
{
printf
(
"numOfRow = %d
\n
"
,
numOfRow
);
int
numFields
=
taos_num_fields
(
res
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
res
);
TAOS
*
_taos
=
(
TAOS
*
)
param
;
if
(
numOfRow
>
0
)
{
for
(
int
i
=
0
;
i
<
numOfRow
;
++
i
)
{
TAOS_ROW
row
=
taos_fetch_row
(
res
);
char
temp
[
256
]
=
{
0
};
taos_print_row
(
temp
,
row
,
fields
,
numFields
);
printf
(
"%s
\n
"
,
temp
);
}
taos_fetch_rows_a
(
res
,
fetchCallback
,
_taos
);
}
else
{
printf
(
"no more data, close the connection.
\n
"
);
// taos_free_result(res);
// taos_close(_taos);
// taos_cleanup();
}
}
void
queryCallback
(
void
*
param
,
void
*
res
,
int32_t
code
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
printf
(
"failed to execute, reason:%s
\n
"
,
taos_errstr
(
res
));
}
printf
(
"start to fetch data
\n
"
);
taos_fetch_rows_a
(
res
,
fetchCallback
,
param
);
}
void
queryCallback1
(
void
*
param
,
void
*
res
,
int32_t
code
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
printf
(
"failed to execute, reason:%s
\n
"
,
taos_errstr
(
res
));
}
taos_free_result
(
res
);
printf
(
"exec query:
\n
"
);
taos_query_a
(
param
,
"select * from tm1"
,
queryCallback
,
param
);
}
}
// namespace
int
main
(
int
argc
,
char
**
argv
)
{
...
...
@@ -52,7 +94,7 @@ TEST(testCase, driverInit_Test) {
// taosInitGlobalCfg();
// taos_init();
}
#if 0
TEST
(
testCase
,
connect_Test
)
{
// taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg");
...
...
@@ -62,7 +104,7 @@ TEST(testCase, connect_Test) {
}
taos_close
(
pConn
);
}
#if 0
TEST(testCase, create_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
...
...
@@ -432,7 +474,7 @@ TEST(testCase, create_multiple_tables) {
taos_free_result(pRes);
pRes = taos_query(pConn, "create table t_2 using st1 tags(1)");
pRes = taos_query(pConn, "create table
if not exists
t_2 using st1 tags(1)");
if (taos_errno(pRes) != 0) {
printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
...
...
@@ -440,7 +482,7 @@ TEST(testCase, create_multiple_tables) {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table t_3 using st1 tags(2)");
pRes = taos_query(pConn, "create table
if not exists
t_3 using st1 tags(2)");
if (taos_errno(pRes) != 0) {
printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
...
...
@@ -480,9 +522,7 @@ TEST(testCase, show_table_Test) {
TAOS_RES* pRes = taos_query(pConn, "show tables");
if (taos_errno(pRes) != 0) {
printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
}
taos_free_result(pRes);
taos_query(pConn, "use abc1");
...
...
@@ -550,6 +590,7 @@ TEST(testCase, generated_request_id_test) {
taosHashCleanup(phash);
}
TEST(testCase, insert_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
...
...
@@ -567,7 +608,6 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...
...
@@ -606,7 +646,7 @@ TEST(testCase, projection_query_tables) {
}
taos_free_result(pRes);
for
(
int32_t
i
=
0
;
i
<
1000
000
;
i
+=
20
)
{
for(int32_t i = 0; i < 1000; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
...
...
@@ -664,7 +704,7 @@ TEST(testCase, projection_query_tables) {
// taos_free_result(pRes);
taos_close(pConn);
}
#if 0
TEST(testCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
...
...
@@ -705,7 +745,7 @@ TEST(testCase, agg_query_tables) {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "
explain analyze select count(*) from tu interval(1s)
");
pRes = taos_query(pConn, "
show stables
");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
...
...
@@ -735,4 +775,24 @@ TEST(testCase, agg_query_tables) {
}
#endif
/*
--- copy the following script in the shell to setup the environment ---
create database test;
use test;
create table m1(ts timestamp, k int) tags(a int);
create table tm0 using m1 tags(1);
create table tm1 using m1 tags(2);
insert into tm0 values('2021-1-1 1:1:1.120', 1) ('2021-1-1 1:1:2.9', 2) tm1 values('2021-1-1 1:1:1.120', 11) ('2021-1-1 1:1:2.99', 22);
*/
TEST
(
testCase
,
async_api_test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
taos_query_a
(
pConn
,
"drop table test.tm0"
,
queryCallback
,
pConn
);
getchar
();
taos_close
(
pConn
);
}
#pragma GCC diagnostic pop
source/client/test/smlTest.cpp
浏览文件 @
21658047
...
...
@@ -486,7 +486,7 @@ TEST(testCase, smlProcess_influx_Test) {
pRes
=
taos_query
(
taos
,
"use inflx_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_LINE_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
...
...
@@ -607,7 +607,7 @@ TEST(testCase, smlParseLine_error_Test) {
pRes
=
taos_query
(
taos
,
"use sml_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_LINE_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
...
...
@@ -656,7 +656,7 @@ TEST(testCase, smlProcess_telnet_Test) {
pRes
=
taos_query
(
taos
,
"use telnet_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_TELNET_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
...
...
@@ -710,7 +710,7 @@ TEST(testCase, smlProcess_json1_Test) {
pRes
=
taos_query
(
taos
,
"use json_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
...
...
@@ -779,7 +779,7 @@ TEST(testCase, smlProcess_json2_Test) {
pRes
=
taos_query
(
taos
,
"use sml_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
...
...
@@ -823,7 +823,7 @@ TEST(testCase, smlProcess_json3_Test) {
pRes
=
taos_query
(
taos
,
"use sml_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
...
...
@@ -895,7 +895,7 @@ TEST(testCase, smlProcess_json4_Test) {
pRes
=
taos_query
(
taos
,
"use sml_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
...
...
@@ -957,7 +957,7 @@ TEST(testCase, smlParseTelnetLine_error_Test) {
pRes
=
taos_query
(
taos
,
"use sml_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_TELNET_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
...
...
@@ -1006,7 +1006,7 @@ TEST(testCase, smlParseTelnetLine_diff_type_Test) {
pRes
=
taos_query
(
taos
,
"use sml_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_TELNET_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
...
...
@@ -1033,7 +1033,7 @@ TEST(testCase, smlParseTelnetLine_json_error_Test) {
pRes
=
taos_query
(
taos
,
"use sml_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_TELNET_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
...
...
@@ -1101,7 +1101,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) {
pRes
=
taos_query
(
taos
,
"use sml_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_TELNET_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
...
...
@@ -1146,7 +1146,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) {
pRes
=
taos_query
(
taos
,
"use sml_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_TELNET_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
...
...
@@ -1191,7 +1191,7 @@ TEST(testCase, sml_TD15662_Test) {
pRes
=
taos_query
(
taos
,
"use db_15662"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_LINE_PROTOCOL
,
TSDB_SML_TIMESTAMP_MILLI_SECONDS
);
...
...
@@ -1218,7 +1218,7 @@ TEST(testCase, sml_TD15735_Test) {
pRes
=
taos_query
(
taos
,
"use sml_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_TELNET_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
...
...
@@ -1244,7 +1244,7 @@ TEST(testCase, sml_TD15742_Test) {
pRes
=
taos_query
(
taos
,
"use TD15742"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_LINE_PROTOCOL
,
TSDB_SML_TIMESTAMP_MILLI_SECONDS
);
...
...
source/common/src/systable.c
浏览文件 @
21658047
...
...
@@ -78,7 +78,7 @@ static const SSysDbTableSchema userDBSchema[] = {
{.
name
=
"replica"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_TINYINT
},
{.
name
=
"strict"
,
.
bytes
=
9
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"duration"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"keep"
,
.
bytes
=
24
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"keep"
,
.
bytes
=
32
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"buffer"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"pagesize"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"pages"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
...
...
source/dnode/mnode/impl/src/mndFunc.c
浏览文件 @
21658047
...
...
@@ -538,7 +538,7 @@ static int32_t mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
isAgg
,
false
);
char
b3
[
TSDB_TYPE_STR_MAX_LEN
]
=
{
0
};
char
b3
[
TSDB_TYPE_STR_MAX_LEN
+
1
]
=
{
0
};
STR_WITH_MAXSIZE_TO_VARSTR
(
b3
,
mnodeGenTypeStr
(
buf
,
TSDB_TYPE_STR_MAX_LEN
,
pFunc
->
outputType
,
pFunc
->
outputLen
),
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
...
...
source/dnode/mnode/impl/test/acct/CMakeLists.txt
浏览文件 @
21658047
...
...
@@ -5,9 +5,7 @@ target_link_libraries(
PUBLIC sut
)
if
(
NOT TD_WINDOWS
)
add_test
(
NAME acctTest
COMMAND acctTest
)
endif
(
NOT TD_WINDOWS
)
add_test
(
NAME acctTest
COMMAND acctTest
)
source/dnode/mnode/impl/test/func/CMakeLists.txt
浏览文件 @
21658047
...
...
@@ -5,9 +5,9 @@ target_link_libraries(
PUBLIC sut
)
if
(
NOT TD_WINDOWS
)
#
if(NOT TD_WINDOWS)
add_test
(
NAME funcTest
COMMAND funcTest
)
endif
(
NOT TD_WINDOWS
)
#
endif(NOT TD_WINDOWS)
source/dnode/mnode/impl/test/profile/CMakeLists.txt
浏览文件 @
21658047
...
...
@@ -5,9 +5,9 @@ target_link_libraries(
PUBLIC sut
)
if
(
NOT TD_WINDOWS
)
#
if(NOT TD_WINDOWS)
add_test
(
NAME profileTest
COMMAND profileTest
)
endif
(
NOT TD_WINDOWS
)
#
endif(NOT TD_WINDOWS)
source/dnode/mnode/impl/test/show/CMakeLists.txt
浏览文件 @
21658047
...
...
@@ -5,9 +5,9 @@ target_link_libraries(
PUBLIC sut
)
if
(
NOT TD_WINDOWS
)
#
if(NOT TD_WINDOWS)
add_test
(
NAME showTest
COMMAND showTest
)
endif
(
NOT TD_WINDOWS
)
#
endif(NOT TD_WINDOWS)
source/libs/catalog/inc/catalogInt.h
浏览文件 @
21658047
...
...
@@ -490,7 +490,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STabl
int32_t
ctgGetTbMetaFromMnode
(
CTG_PARAMS
,
const
SName
*
pTableName
,
STableMetaOutput
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetTbMetaFromVnode
(
CTG_PARAMS
,
const
SName
*
pTableName
,
SVgroupInfo
*
vgroupInfo
,
STableMetaOutput
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgInitJob
(
CTG_PARAMS
,
SCtgJob
**
job
,
uint64_t
reqId
,
const
SCatalogReq
*
pReq
,
catalogCallback
fp
,
void
*
param
);
int32_t
ctgInitJob
(
CTG_PARAMS
,
SCtgJob
**
job
,
uint64_t
reqId
,
const
SCatalogReq
*
pReq
,
catalogCallback
fp
,
void
*
param
,
int32_t
*
taskNum
);
int32_t
ctgLaunchJob
(
SCtgJob
*
pJob
);
int32_t
ctgMakeAsyncRes
(
SCtgJob
*
pJob
);
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
21658047
...
...
@@ -1028,16 +1028,21 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmt
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
int32_t
code
=
0
;
int32_t
code
=
0
,
taskNum
=
0
;
SCtgJob
*
pJob
=
NULL
;
CTG_ERR_JRET
(
ctgInitJob
(
CTG_PARAMS_LIST
(),
&
pJob
,
reqId
,
pReq
,
fp
,
param
));
CTG_ERR_JRET
(
ctgInitJob
(
CTG_PARAMS_LIST
(),
&
pJob
,
reqId
,
pReq
,
fp
,
param
,
&
taskNum
));
if
(
taskNum
<=
0
)
{
SMetaData
*
pMetaData
=
taosMemoryCalloc
(
1
,
sizeof
(
SMetaData
));
fp
(
pMetaData
,
param
,
TSDB_CODE_SUCCESS
);
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
CTG_ERR_JRET
(
ctgLaunchJob
(
pJob
));
*
jobId
=
pJob
->
refId
;
// NOTE: here the assignment of jobId is invalid, may over-write the true scheduler created query job.
// *jobId = pJob->refId;
_return:
if
(
pJob
)
{
taosReleaseRef
(
gCtgMgmt
.
jobPool
,
pJob
->
refId
);
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
21658047
...
...
@@ -233,7 +233,7 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user)
}
int32_t
ctgInitJob
(
CTG_PARAMS
,
SCtgJob
**
job
,
uint64_t
reqId
,
const
SCatalogReq
*
pReq
,
catalogCallback
fp
,
void
*
param
)
{
int32_t
ctgInitJob
(
CTG_PARAMS
,
SCtgJob
**
job
,
uint64_t
reqId
,
const
SCatalogReq
*
pReq
,
catalogCallback
fp
,
void
*
param
,
int32_t
*
taskNum
)
{
int32_t
code
=
0
;
int32_t
tbMetaNum
=
(
int32_t
)
taosArrayGetSize
(
pReq
->
pTableMeta
);
int32_t
dbVgNum
=
(
int32_t
)
taosArrayGetSize
(
pReq
->
pDbVgroup
);
...
...
@@ -245,15 +245,15 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
int32_t
userNum
=
(
int32_t
)
taosArrayGetSize
(
pReq
->
pUser
);
int32_t
dbInfoNum
=
(
int32_t
)
taosArrayGetSize
(
pReq
->
pDbInfo
);
int32_t
taskNum
=
tbMetaNum
+
dbVgNum
+
udfNum
+
tbHashNum
+
qnodeNum
+
dbCfgNum
+
indexNum
+
userNum
+
dbInfoNum
;
if
(
taskNum
<=
0
)
{
ctgError
(
"
empty input for job, taskNum:%d"
,
taskNum
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
)
;
*
taskNum
=
tbMetaNum
+
dbVgNum
+
udfNum
+
tbHashNum
+
qnodeNum
+
dbCfgNum
+
indexNum
+
userNum
+
dbInfoNum
;
if
(
*
taskNum
<=
0
)
{
ctgError
(
"
Empty input for job, no need to retrieve meta, reqId:0x%"
PRIx64
,
reqId
);
return
TSDB_CODE_SUCCESS
;
}
*
job
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgJob
));
if
(
NULL
==
*
job
)
{
ctgError
(
"
calloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgJob
)
);
ctgError
(
"
failed to calloc, size:%d, reqId:0x%"
PRIx64
,
(
int32_t
)
sizeof
(
SCtgJob
),
reqId
);
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
...
...
@@ -275,52 +275,52 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
pJob
->
indexNum
=
indexNum
;
pJob
->
userNum
=
userNum
;
pJob
->
dbInfoNum
=
dbInfoNum
;
pJob
->
pTasks
=
taosArrayInit
(
taskNum
,
sizeof
(
SCtgTask
));
pJob
->
pTasks
=
taosArrayInit
(
*
taskNum
,
sizeof
(
SCtgTask
));
if
(
NULL
==
pJob
->
pTasks
)
{
ctgError
(
"taosArrayInit %d tasks failed"
,
taskNum
);
ctgError
(
"taosArrayInit %d tasks failed"
,
*
taskNum
);
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
int32_t
taskIdx
=
0
;
for
(
int32_t
i
=
0
;
i
<
dbVgNum
;
++
i
)
{
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbVgroup
,
i
);
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbVgroup
,
i
);
CTG_ERR_JRET
(
ctgInitGetDbVgTask
(
pJob
,
taskIdx
++
,
dbFName
));
}
for
(
int32_t
i
=
0
;
i
<
dbCfgNum
;
++
i
)
{
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbCfg
,
i
);
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbCfg
,
i
);
CTG_ERR_JRET
(
ctgInitGetDbCfgTask
(
pJob
,
taskIdx
++
,
dbFName
));
}
for
(
int32_t
i
=
0
;
i
<
dbInfoNum
;
++
i
)
{
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbInfo
,
i
);
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbInfo
,
i
);
CTG_ERR_JRET
(
ctgInitGetDbInfoTask
(
pJob
,
taskIdx
++
,
dbFName
));
}
for
(
int32_t
i
=
0
;
i
<
tbMetaNum
;
++
i
)
{
SName
*
name
=
taosArrayGet
(
pReq
->
pTableMeta
,
i
);
SName
*
name
=
taosArrayGet
(
pReq
->
pTableMeta
,
i
);
CTG_ERR_JRET
(
ctgInitGetTbMetaTask
(
pJob
,
taskIdx
++
,
name
));
}
for
(
int32_t
i
=
0
;
i
<
tbHashNum
;
++
i
)
{
SName
*
name
=
taosArrayGet
(
pReq
->
pTableHash
,
i
);
SName
*
name
=
taosArrayGet
(
pReq
->
pTableHash
,
i
);
CTG_ERR_JRET
(
ctgInitGetTbHashTask
(
pJob
,
taskIdx
++
,
name
));
}
for
(
int32_t
i
=
0
;
i
<
indexNum
;
++
i
)
{
char
*
indexName
=
taosArrayGet
(
pReq
->
pIndex
,
i
);
char
*
indexName
=
taosArrayGet
(
pReq
->
pIndex
,
i
);
CTG_ERR_JRET
(
ctgInitGetIndexTask
(
pJob
,
taskIdx
++
,
indexName
));
}
for
(
int32_t
i
=
0
;
i
<
udfNum
;
++
i
)
{
char
*
udfName
=
taosArrayGet
(
pReq
->
pUdf
,
i
);
char
*
udfName
=
taosArrayGet
(
pReq
->
pUdf
,
i
);
CTG_ERR_JRET
(
ctgInitGetUdfTask
(
pJob
,
taskIdx
++
,
udfName
));
}
for
(
int32_t
i
=
0
;
i
<
userNum
;
++
i
)
{
SUserAuthInfo
*
user
=
taosArrayGet
(
pReq
->
pUser
,
i
);
SUserAuthInfo
*
user
=
taosArrayGet
(
pReq
->
pUser
,
i
);
CTG_ERR_JRET
(
ctgInitGetUserTask
(
pJob
,
taskIdx
++
,
user
));
}
...
...
@@ -328,22 +328,8 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
CTG_ERR_JRET
(
ctgInitGetQnodeTask
(
pJob
,
taskIdx
++
));
}
pJob
->
refId
=
taosAddRef
(
gCtgMgmt
.
jobPool
,
pJob
);
if
(
pJob
->
refId
<
0
)
{
ctgError
(
"add job to ref failed, error: %s"
,
tstrerror
(
terrno
));
CTG_ERR_JRET
(
terrno
);
}
taosAcquireRef
(
gCtgMgmt
.
jobPool
,
pJob
->
refId
);
qDebug
(
"QID:%"
PRIx64
", job %"
PRIx64
" initialized, task num %d"
,
pJob
->
queryId
,
pJob
->
refId
,
taskNum
);
return
TSDB_CODE_SUCCESS
;
_return:
taosMemoryFreeClear
(
*
job
);
CTG_RET
(
code
);
}
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
21658047
...
...
@@ -1324,13 +1324,11 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SMinmaxResInfo
*
pRes
=
GET_ROWCELL_INTERBUF
(
pEntryInfo
);
int32_t
type
=
pCtx
->
input
.
pData
[
0
]
->
info
.
type
;
int32_t
slotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
int32_t
currentRow
=
pBlock
->
info
.
rows
;
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
// todo assign the tag value
int32_t
currentRow
=
pBlock
->
info
.
rows
;
pEntryInfo
->
isNullRes
=
(
pEntryInfo
->
numOfRes
==
0
);
if
(
pCol
->
info
.
type
==
TSDB_DATA_TYPE_FLOAT
)
{
float
v
=
*
(
double
*
)
&
pRes
->
v
;
...
...
@@ -1339,7 +1337,10 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
colDataAppend
(
pCol
,
currentRow
,
(
const
char
*
)
&
pRes
->
v
,
pEntryInfo
->
isNullRes
);
}
setSelectivityValue
(
pCtx
,
pBlock
,
&
pRes
->
tuplePos
,
currentRow
);
if
(
pEntryInfo
->
numOfRes
>
0
)
{
setSelectivityValue
(
pCtx
,
pBlock
,
&
pRes
->
tuplePos
,
currentRow
);
}
return
pEntryInfo
->
numOfRes
;
}
...
...
source/libs/index/test/CMakeLists.txt
浏览文件 @
21658047
...
...
@@ -92,16 +92,14 @@ target_link_libraries (idxJsonUT
index
)
if
(
NOT TD_WINDOWS
)
add_test
(
NAME idxtest
COMMAND idxTest
)
add_test
(
NAME idxJsonUT
COMMAND idxJsonUT
)
endif
(
NOT TD_WINDOWS
)
add_test
(
NAME idxtest
COMMAND idxTest
)
add_test
(
NAME idxJsonUT
COMMAND idxJsonUT
)
add_test
(
NAME idxUtilUT
COMMAND idxUtilUT
...
...
source/libs/parser/src/parAstParser.c
浏览文件 @
21658047
...
...
@@ -124,6 +124,10 @@ static EDealRes collectMetaKeyFromRealTable(SCollectMetaKeyFromExprCxt* pCxt, SR
pCxt
->
errCode
=
reserveUserAuthInCache
(
pCxt
->
pComCxt
->
pParseCxt
->
acctId
,
pCxt
->
pComCxt
->
pParseCxt
->
pUser
,
pRealTable
->
table
.
dbName
,
AUTH_TYPE_READ
,
pCxt
->
pComCxt
->
pMetaCache
);
}
if
(
TSDB_CODE_SUCCESS
==
pCxt
->
errCode
)
{
pCxt
->
errCode
=
reserveDbVgInfoInCache
(
pCxt
->
pComCxt
->
pParseCxt
->
acctId
,
pRealTable
->
table
.
dbName
,
pCxt
->
pComCxt
->
pMetaCache
);
}
return
TSDB_CODE_SUCCESS
==
pCxt
->
errCode
?
DEAL_RES_CONTINUE
:
DEAL_RES_ERROR
;
}
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
21658047
...
...
@@ -2857,7 +2857,6 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt,
int32_t
code
=
getDBCfg
(
pCxt
,
pStmt
->
dbName
,
&
dbCfg
);
int32_t
num
=
taosArrayGetSize
(
dbCfg
.
pRetensions
);
if
(
TSDB_CODE_SUCCESS
!=
code
||
num
<
2
)
{
taosArrayDestroy
(
dbCfg
.
pRetensions
);
return
code
;
}
for
(
int32_t
i
=
1
;
i
<
num
;
++
i
)
{
...
...
@@ -5038,9 +5037,6 @@ int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) {
STranslateContext
cxt
=
{
0
};
int32_t
code
=
initTranslateContext
(
pParseCxt
,
pQuery
->
pMetaCache
,
&
cxt
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
fmFuncMgtInit
();
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
rewriteQuery
(
&
cxt
,
pQuery
);
}
...
...
source/libs/parser/test/parSelectTest.cpp
浏览文件 @
21658047
...
...
@@ -33,6 +33,8 @@ TEST_F(ParserSelectTest, basic) {
run
(
"SELECT ts, t.c1 FROM (SELECT * FROM t1) t"
);
run
(
"SELECT * FROM t1 tt1, t1 tt2 WHERE tt1.c1 = tt2.c1"
);
run
(
"SELECT * FROM st1"
);
}
TEST_F
(
ParserSelectTest
,
constant
)
{
...
...
source/libs/parser/test/parTestMain.cpp
浏览文件 @
21658047
...
...
@@ -35,6 +35,7 @@ namespace ParserTest {
class
ParserEnv
:
public
testing
::
Environment
{
public:
virtual
void
SetUp
()
{
fmFuncMgtInit
();
initMetaDataEnv
();
generateMetaData
();
initLog
(
TD_TMP_DIR_PATH
"td"
);
...
...
source/libs/planner/test/CMakeLists.txt
浏览文件 @
21658047
...
...
@@ -32,9 +32,9 @@ if(${BUILD_WINGETOPT})
target_link_libraries
(
plannerTest PUBLIC wingetopt
)
endif
()
if
(
NOT TD_WINDOWS
)
#
if(NOT TD_WINDOWS)
add_test
(
NAME plannerTest
COMMAND plannerTest
)
endif
(
NOT TD_WINDOWS
)
#
endif(NOT TD_WINDOWS)
source/libs/planner/test/planTestMain.cpp
浏览文件 @
21658047
...
...
@@ -16,6 +16,7 @@
#include <string>
#include <gtest/gtest.h>
#include "functionMgt.h"
#include "getopt.h"
#include "mockCatalog.h"
#include "planTestUtil.h"
...
...
@@ -23,6 +24,7 @@
class
PlannerEnv
:
public
testing
::
Environment
{
public:
virtual
void
SetUp
()
{
fmFuncMgtInit
();
initMetaDataEnv
();
generateMetaData
();
initLog
(
TD_TMP_DIR_PATH
"td"
);
...
...
source/libs/scalar/src/sclfunc.c
浏览文件 @
21658047
...
...
@@ -718,7 +718,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int16_t
outputType
=
GET_PARAM_TYPE
(
&
pOutput
[
0
]);
int64_t
outputLen
=
GET_PARAM_BYTES
(
&
pOutput
[
0
]);
char
*
outputBuf
=
taosMemoryCalloc
(
outputLen
*
pInput
[
0
].
numOfRows
,
1
);
char
*
outputBuf
=
taosMemoryCalloc
(
outputLen
*
pInput
[
0
].
numOfRows
+
1
,
1
);
char
*
output
=
outputBuf
;
for
(
int32_t
i
=
0
;
i
<
pInput
[
0
].
numOfRows
;
++
i
)
{
...
...
source/libs/scheduler/src/schDbg.c
浏览文件 @
21658047
...
...
@@ -30,13 +30,4 @@ void schdExecCallback(SQueryResult* pResult, void* param, int32_t code) {
tsem_post
(
&
schdRspSem
);
}
void
schdFetchCallback
(
void
*
pResult
,
void
*
param
,
int32_t
code
)
{
SSchdFetchParam
*
fParam
=
(
SSchdFetchParam
*
)
param
;
*
fParam
->
pData
=
pResult
;
*
fParam
->
code
=
code
;
tsem_post
(
&
schdRspSem
);
}
source/libs/scheduler/src/schJob.c
浏览文件 @
21658047
...
...
@@ -21,9 +21,9 @@
#include "tref.h"
#include "trpc.h"
FORCE_INLINE
SSchJob
*
schAcquireJob
(
int64_t
refId
)
{
return
(
SSchJob
*
)
taosAcquireRef
(
schMgmt
.
jobRef
,
refId
);
}
FORCE_INLINE
SSchJob
*
schAcquireJob
(
int64_t
refId
)
{
qDebug
(
"acquire jobId:0x%"
PRIx64
,
refId
);
return
(
SSchJob
*
)
taosAcquireRef
(
schMgmt
.
jobRef
,
refId
);
}
FORCE_INLINE
int32_t
schReleaseJob
(
int64_t
refId
)
{
return
taosReleaseRef
(
schMgmt
.
jobRef
,
refId
);
}
FORCE_INLINE
int32_t
schReleaseJob
(
int64_t
refId
)
{
qDebug
(
"release jobId:0x%"
PRIx64
,
refId
);
return
taosReleaseRef
(
schMgmt
.
jobRef
,
refId
);
}
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
)
{
pTask
->
plan
=
pPlan
;
...
...
@@ -1365,8 +1365,6 @@ void schFreeJobImpl(void *job) {
int32_t
schExecJobImpl
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
SSchResInfo
*
pRes
,
int64_t
startTs
,
bool
sync
)
{
qDebug
(
"QID:0x%"
PRIx64
" job started"
,
pDag
->
queryId
);
if
(
pNodeList
==
NULL
||
taosArrayGetSize
(
pNodeList
)
<=
0
)
{
qDebug
(
"QID:0x%"
PRIx64
" input exec nodeList is empty"
,
pDag
->
queryId
);
}
...
...
@@ -1375,6 +1373,7 @@ int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_
SSchJob
*
pJob
=
NULL
;
SCH_ERR_RET
(
schInitJob
(
&
pJob
,
pDag
,
pTrans
,
pNodeList
,
sql
,
pRes
,
startTs
,
sync
));
qDebug
(
"QID:0x%"
PRIx64
" jobId:0x%"
PRIx64
" started"
,
pDag
->
queryId
,
pJob
->
refId
);
*
job
=
pJob
->
refId
;
SCH_ERR_JRET
(
schLaunchJob
(
pJob
));
...
...
@@ -1386,7 +1385,7 @@ int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_
pJob
->
userCb
=
SCH_EXEC_CB
;
}
SCH_JOB_DLOG
(
"job exec done, job status:%s
"
,
SCH_GET_JOB_STATUS_STR
(
pJob
)
);
SCH_JOB_DLOG
(
"job exec done, job status:%s
, jobId:0x%"
PRIx64
,
SCH_GET_JOB_STATUS_STR
(
pJob
),
pJob
->
refId
);
_return:
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
21658047
...
...
@@ -79,12 +79,19 @@ int32_t schedulerExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int6
int32_t
schedulerAsyncExecJob
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
int64_t
startTs
,
schedulerExecCallback
fp
,
void
*
param
)
{
if
(
NULL
==
pTrans
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
||
NULL
==
fp
||
NULL
==
param
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SSchResInfo
resInfo
=
{.
execFp
=
fp
,
.
userParam
=
param
};
SCH_RET
(
schAsyncExecJob
(
pTrans
,
pNodeList
,
pDag
,
pJob
,
sql
,
startTs
,
&
resInfo
));
int32_t
code
=
0
;
if
(
NULL
==
pTrans
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
||
NULL
==
fp
)
{
code
=
TSDB_CODE_QRY_INVALID_INPUT
;
}
else
{
SSchResInfo
resInfo
=
{.
execFp
=
fp
,
.
userParam
=
param
};
code
=
schAsyncExecJob
(
pTrans
,
pNodeList
,
pDag
,
pJob
,
sql
,
startTs
,
&
resInfo
);
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
fp
(
NULL
,
param
,
code
);
}
return
code
;
}
int32_t
schedulerFetchRows
(
int64_t
job
,
void
**
pData
)
{
...
...
@@ -95,7 +102,7 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
int32_t
code
=
0
;
SSchJob
*
pJob
=
schAcquireJob
(
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire job from jobRef list failed, may be dropped,
refId:
%"
PRIx64
,
job
);
qError
(
"acquire job from jobRef list failed, may be dropped,
jobId:0x
%"
PRIx64
,
job
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
...
...
@@ -108,27 +115,26 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
SCH_RET
(
code
);
}
int32_t
schedulerAsyncFetchRows
(
int64_t
job
,
schedulerFetchCallback
fp
,
void
*
param
)
{
void
schedulerAsyncFetchRows
(
int64_t
job
,
schedulerFetchCallback
fp
,
void
*
param
)
{
if
(
NULL
==
fp
||
NULL
==
param
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
fp
(
NULL
,
param
,
TSDB_CODE_QRY_INVALID_INPUT
);
return
;
}
int32_t
code
=
0
;
SSchJob
*
pJob
=
schAcquireJob
(
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire job from jobRef list failed, may be dropped, refId:%"
PRIx64
,
job
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
qError
(
"acquire job from jobRef list failed, may be dropped, jobId:0x%"
PRIx64
,
job
);
fp
(
NULL
,
param
,
TSDB_CODE_SCH_STATUS_ERROR
);
return
;
}
pJob
->
attr
.
syncSchedule
=
false
;
pJob
->
userRes
.
fetchFp
=
fp
;
pJob
->
userRes
.
userParam
=
param
;
code
=
schAsyncFetchRows
(
pJob
);
/*code = */
schAsyncFetchRows
(
pJob
);
schReleaseJob
(
job
);
SCH_RET
(
code
);
}
int32_t
schedulerGetTasksStatus
(
int64_t
job
,
SArray
*
pSub
)
{
...
...
@@ -165,7 +171,7 @@ _return:
int32_t
scheduleCancelJob
(
int64_t
job
)
{
SSchJob
*
pJob
=
schAcquireJob
(
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire job from jobRef list failed, may be dropped,
refId:
%"
PRIx64
,
job
);
qError
(
"acquire job from jobRef list failed, may be dropped,
jobId:0x
%"
PRIx64
,
job
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
...
...
@@ -179,7 +185,7 @@ int32_t scheduleCancelJob(int64_t job) {
void
schedulerFreeJob
(
int64_t
job
)
{
SSchJob
*
pJob
=
schAcquireJob
(
job
);
if
(
NULL
==
pJob
)
{
q
Debug
(
"acquire job from jobRef list failed, may be dropped, refId:
%"
PRIx64
,
job
);
q
Error
(
"acquire job from jobRef list failed, may be dropped, jobId:0x
%"
PRIx64
,
job
);
return
;
}
...
...
tests/pytest/util/dnodes.py
浏览文件 @
21658047
...
...
@@ -247,7 +247,7 @@ class TDDnode:
if
bool
(
updatecfgDict
)
and
updatecfgDict
[
0
]
and
updatecfgDict
[
0
][
0
]:
print
(
updatecfgDict
[
0
][
0
])
for
key
,
value
in
updatecfgDict
[
0
][
0
].
items
():
if
key
==
"clientCfg"
:
if
key
==
"clientCfg"
and
self
.
remoteIP
==
""
and
not
platform
.
system
().
lower
()
==
'windows'
:
continue
if
value
==
'dataDir'
:
if
isFirstDir
:
...
...
tests/script/jenkins/basic.txt
浏览文件 @
21658047
...
...
@@ -135,6 +135,7 @@
./test.sh -f tsim/sync/3Replica5VgElect.sim
./test.sh -f tsim/sync/oneReplica1VgElect.sim
./test.sh -f tsim/sync/oneReplica5VgElect.sim
# ./test.sh -f tsim/sync/3Replica5VgElect3mnode.sim
# --- catalog
./test.sh -f tsim/catalog/alterInCurrent.sim
...
...
tests/script/tsim/sync/3Replica5VgElect.sim
浏览文件 @
21658047
...
...
@@ -751,5 +751,6 @@ system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
tests/script/tsim/sync/3Replica5VgElect3mnode.sim
0 → 100644
浏览文件 @
21658047
此差异已折叠。
点击以展开。
tests/script/tsim/sync/3Replica5VgElect3mnodedrop.sim
0 → 100644
浏览文件 @
21658047
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
print ===> create clusters using four dnodes;
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> 1-dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
if $data[0][0] != 1 then
return -1
endi
if $data[0][4] != ready then
goto check_dnode_ready
endi
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
$loop_cnt = 0
check_dnode_ready_1:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnodes not ready!
return -1
endi
sql show dnodes
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
if $data[0][4] != ready then
goto check_dnode_ready_1
endi
if $data[1][4] != ready then
goto check_dnode_ready_1
endi
if $data[2][4] != ready then
goto check_dnode_ready_1
endi
if $data[3][4] != ready then
goto check_dnode_ready_1
endi
$replica = 3
$vgroups = 5
print ============= create database
sql create database db replica $replica vgroups $vgroups
$loop_cnt = 0
check_db_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 100 then
print ====> db not ready!
return -1
endi
sql show databases
print ===> rows: $rows
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19]
if $rows != 3 then
return -1
endi
if $data[2][19] != ready then
goto check_db_ready
endi
sql use db
$loop_cnt = 0
check_vg_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 300 then
print ====> vgroups not ready!
return -1
endi
sql show vgroups
print ===> rows: $rows
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11]
print $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] $data[1][7] $data[1][8] $data[1][9] $data[1][10] $data[1][11]
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][10] $data[2][11]
print $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6] $data[3][7] $data[3][8] $data[3][9] $data[3][10] $data[3][11]
print $data[4][0] $data[4][1] $data[4][2] $data[4][3] $data[4][4] $data[4][5] $data[4][6] $data[4][7] $data[4][8] $data[4][9] $data[4][10] $data[4][11]
if $rows != $vgroups then
return -1
endi
if $data[0][4] == LEADER then
if $data[0][6] == FOLLOWER then
if $data[0][8] == FOLLOWER then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
endi
endi
elif $data[0][6] == LEADER then
if $data[0][4] == FOLLOWER then
if $data[0][8] == FOLLOWER then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
endi
endi
elif $data[0][8] == LEADER then
if $data[0][4] == FOLLOWER then
if $data[0][6] == FOLLOWER then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
endi
endi
else
goto check_vg_ready
endi
if $data[1][4] == LEADER then
if $data[1][6] == FOLLOWER then
if $data[1][8] == FOLLOWER then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][3]
endi
endi
elif $data[1][6] == LEADER then
if $data[1][4] == FOLLOWER then
if $data[1][8] == FOLLOWER then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][5]
endi
endi
elif $data[1][8] == LEADER then
if $data[1][4] == FOLLOWER then
if $data[1][6] == FOLLOWER then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][7]
endi
endi
else
goto check_vg_ready
endi
if $data[2][4] == LEADER then
if $data[2][6] == FOLLOWER then
if $data[2][8] == FOLLOWER then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][3]
endi
endi
elif $data[2][6] == LEADER then
if $data[2][4] == FOLLOWER then
if $data[2][8] == FOLLOWER then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][5]
endi
endi
elif $data[2][8] == LEADER then
if $data[2][4] == FOLLOWER then
if $data[2][6] == FOLLOWER then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][7]
endi
endi
else
goto check_vg_ready
endi
if $data[3][4] == LEADER then
if $data[3][6] == FOLLOWER then
if $data[3][8] == FOLLOWER then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][3]
endi
endi
elif $data[3][6] == LEADER then
if $data[3][4] == FOLLOWER then
if $data[3][8] == FOLLOWER then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][5]
endi
endi
elif $data[3][8] == LEADER then
if $data[3][4] == FOLLOWER then
if $data[3][6] == FOLLOWER then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][7]
endi
endi
else
goto check_vg_ready
endi
if $data[4][4] == LEADER then
if $data[4][6] == FOLLOWER then
if $data[4][8] == FOLLOWER then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][3]
endi
endi
elif $data[4][6] == LEADER then
if $data[4][4] == FOLLOWER then
if $data[4][8] == FOLLOWER then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][5]
endi
endi
elif $data[4][8] == LEADER then
if $data[4][4] == FOLLOWER then
if $data[4][6] == FOLLOWER then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][7]
endi
endi
else
goto check_vg_ready
endi
vg_ready:
print ====> create stable/child table
sql create table stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
$ctbPrefix = ctb
$ntbPrefix = ntb
$tbNum = 10
$i = 0
while $i < $tbNum
$ctb = $ctbPrefix . $i
sql create table $ctb using stb tags( $i )
$ntb = $ntbPrefix . $i
sql create table $ntb (ts timestamp, c1 int, c2 float, c3 binary(10))
$i = $i + 1
endw
$totalTblNum = $tbNum * 2
sleep 1000
sql show tables
print ====> expect $totalTblNum and infinsert $rows in fact
if $rows != $totalTblNum then
return -1
endi
sql connect
print ================ insert data
$dbNamme = db
$ctbPrefix = ctb
$ntbPrefix = ntb
$tbNum = 10
$rowNum = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
sql use $dbNamme
$i = 0
while $i < $tbNum
$ctb = $ctbPrefix . $i
$ntb = $ntbPrefix . $i
$x = 0
while $x < $rowNum
$binary = ' . binary
$binary = $binary . $i
$binary = $binary . '
sql insert into $ctb values ($tstart , $i , $x , $binary )
sql insert into $ntb values ($tstart , 999 , 999 , 'binary-ntb' )
$tstart = $tstart + 1
$x = $x + 1
endw
$i = $i + 1
$tstart = 1640966400000
endw
print ================ create mnode
sql create mnode on dnode 2;
sql create mnode on dnode 3;
$loop_cnt = 0
check_mnode_ready_2:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 100 then
print ====> first create three mnode not ready!
return -1
endi
sql show mnodes
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3]
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3]
if $data[0][0] != 1 then
return -1
endi
if $data[0][2] != LEADER then
goto check_mnode_ready_2
endi
if $data[1][2] != FOLLOWER then
goto check_mnode_ready_2
endi
if $data[2][2] != FOLLOWER then
goto check_mnode_ready_2
endi
$loop_cnt = 0
check_vg_ready1:
$loop_cnt = $loop_cnt + 1
print $loop_cnt
sleep 202
if $loop_cnt == 300 then
print ====> vgroups not ready!
return -1
endi
sql show vgroups
print ===> rows: $rows
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11]
print $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] $data[1][7] $data[1][8] $data[1][9] $data[1][10] $data[1][11]
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][10] $data[2][11]
print $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6] $data[3][7] $data[3][8] $data[3][9] $data[3][10] $data[3][11]
print $data[4][0] $data[4][1] $data[4][2] $data[4][3] $data[4][4] $data[4][5] $data[4][6] $data[4][7] $data[4][8] $data[4][9] $data[4][10] $data[4][11]
if $rows != $vgroups then
return -1
endi
if $data[0][4] == LEADER then
if $data[0][6] == FOLLOWER then
if $data[0][8] == FOLLOWER then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
endi
endi
elif $data[0][6] == LEADER then
if $data[0][4] == FOLLOWER then
if $data[0][8] == FOLLOWER then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
endi
endi
elif $data[0][8] == LEADER then
if $data[0][4] == FOLLOWER then
if $data[0][6] == FOLLOWER then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
endi
endi
else
goto check_vg_ready1
endi
if $data[1][4] == LEADER then
if $data[1][6] == FOLLOWER then
if $data[1][8] == FOLLOWER then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][3]
endi
endi
elif $data[1][6] == LEADER then
if $data[1][4] == FOLLOWER then
if $data[1][8] == FOLLOWER then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][5]
endi
endi
elif $data[1][8] == LEADER then
if $data[1][4] == FOLLOWER then
if $data[1][6] == FOLLOWER then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][7]
endi
endi
else
goto check_vg_ready1
endi
if $data[2][4] == LEADER then
if $data[2][6] == FOLLOWER then
if $data[2][8] == FOLLOWER then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][3]
endi
endi
elif $data[2][6] == LEADER then
if $data[2][4] == FOLLOWER then
if $data[2][8] == FOLLOWER then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][5]
endi
endi
elif $data[2][8] == LEADER then
if $data[2][4] == FOLLOWER then
if $data[2][6] == FOLLOWER then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][7]
endi
endi
else
goto check_vg_ready1
endi
if $data[3][4] == LEADER then
if $data[3][6] == FOLLOWER then
if $data[3][8] == FOLLOWER then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][3]
endi
endi
elif $data[3][6] == LEADER then
if $data[3][4] == FOLLOWER then
if $data[3][8] == FOLLOWER then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][5]
endi
endi
elif $data[3][8] == LEADER then
if $data[3][4] == FOLLOWER then
if $data[3][6] == FOLLOWER then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][7]
endi
endi
else
goto check_vg_ready1
endi
if $data[4][4] == LEADER then
if $data[4][6] == FOLLOWER then
if $data[4][8] == FOLLOWER then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][3]
endi
endi
elif $data[4][6] == LEADER then
if $data[4][4] == FOLLOWER then
if $data[4][8] == FOLLOWER then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][5]
endi
endi
elif $data[4][8] == LEADER then
if $data[4][4] == FOLLOWER then
if $data[4][6] == FOLLOWER then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][7]
endi
endi
else
goto check_vg_ready1
endi
print ====> final test: create stable/child table
sql create table stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int)
sql show stables
if $rows != 2 then
return -1
endi
$ctbPrefix = ctb1
$ntbPrefix = ntb1
$tbNum = 10
$i = 0
while $i < $tbNum
$ctb = $ctbPrefix . $i
sql create table $ctb using stb1 tags( $i )
$ntb = $ntbPrefix . $i
sql create table $ntb (ts timestamp, c1 int, c2 float, c3 binary(10))
$i = $i + 1
endw
sleep 1000
sql show stables
if $rows != 2 then
return -1
endi
sql show tables
if $rows != 40 then
return -1
endi
system sh/deploy.sh -n dnode5 -i 5
system sh/exec.sh -n dnode5 -s start
sql connect
sql create dnode $hostname port 7500
$loop_cnt = 0
check_dnode_ready3:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 100 then
print ====> 5 dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
print ===> $rows $data[4][0] $data[4][1] $data[4][2] $data[4][3] $data[4][4] $data[4][5] $data[4][6]
if $rows != 5 then
return -1
endi
if $data[4][4] != ready then
goto check_dnode_ready3
endi
print ===> 1:if create users sucessfully,then drop mnode leader
sql create user chr pass '123'
$loop_cnt = 0
check_user_ready:
$loop_cnt = $loop_cnt + 1
print $loop_cnt
sleep 200
if $loop_cnt == 300 then
print ====> vgroups not ready!
return -1
endi
sql show users
print ===> rows: $rows
print ===> $rows $data[0][0] $data[0][1] $data[0][2]
print ===> $rows $data[1][0] $data[1][1] $data[1][2]
if $rows != 2 then
goto check_user_ready
endi
if $data[0][0] == chr then
goto check_user_ready_suc
elif $data[1][0] == chr
goto check_user_ready_suc
else
print ====> creating user failed
goto check_user_ready
endi
check_user_ready_suc:
$loop_cnt = 0
check_mnode_ready_3:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 100 then
print ====> second mnode not ready!
return -1
endi
sql show mnodes
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3]
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3]
if $data[0][0] != 1 then
return -1
endi
if $data[0][2] == LEADER then
if $data[1][2] != FOLLOWER then
goto check_mnode_ready_3
endi
if $data[2][2] != FOLLOWER then
goto check_mnode_ready_3
endi
endi
if $data[1][2] == LEADER then
if $data[0][2] != FOLLOWER then
goto check_mnode_ready_3
endi
if $data[2][2] != FOLLOWER then
goto check_mnode_ready_3
endi
endi
if $data[2][2] == LEADER then
if $data[1][2] != FOLLOWER then
goto check_mnode_ready_3
endi
if $data[0][2] != FOLLOWER then
goto check_mnode_ready_3
endi
endi
sleep 2000
# stop leader and drop dnode
system sh/exec.sh -n dnode1 -s stop
sleep 2000
print ===> 2:if create users sucessfully,then drop mnode leader
sql create user chr2 pass '123'
$loop_cnt = 0
check_user_ready2:
$loop_cnt = $loop_cnt + 1
print $loop_cnt
sleep 200
if $loop_cnt == 300 then
print ====> vgroups not ready!
return -1
endi
sql show mnodes
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3]
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3]
sql show users
print ===> rows: $rows
print ===> $rows $data[0][0] $data[0][1] $data[0][2]
print ===> $rows $data[1][0] $data[1][1] $data[1][2]
print ===> $rows $data[2][0] $data[2][1] $data[2][2]
if $rows != 3 then
goto check_user_ready2
endi
if $data[0][0] == chr2 then
goto check_user_ready2_suc
elif $data[1][0] == chr2
goto check_user_ready2_suc
else
print ====> creating user failed
goto check_user_ready2
endi
check_user_ready2_suc:
tests/script/tsim/testsuit.sim
浏览文件 @
21658047
#run user/pass_alter.sim
#run user/basic1.sim
#run user/privilege2.sim
#run user/user_len.sim
#run user/privilege1.sim
#run user/pass_len.sim
#run tstream/basic1.sim
#run tstream/basic0.sim
#run table/basic1.sim
#run trans/create_db.sim
#run stable/alter1.sim
#run stable/vnode3.sim
#run stable/metrics.sim
#run stable/show.sim
#run stable/values.sim
#run stable/dnode3.sim
#run stable/refcount.sim
#run stable/disk.sim
#run db/basic1.sim
#run db/basic3.sim
#run db/basic7.sim
#run db/basic6.sim
#run db/create_all_options.sim
#run db/basic2.sim
#run db/error1.sim
#run db/taosdlog.sim
#run db/alter_option.sim
#run mnode/basic1.sim
#run parser/fourArithmetic-basic.sim
#run parser/groupby-basic.sim
#run snode/basic1.sim
#run query/time_process.sim
#run query/stddev.sim
#run query/interval-offset.sim
#run query/charScalarFunction.sim
#run query/complex_select.sim
#run query/explain.sim
#run query/crash_sql.sim
#run query/diff.sim
#run query/complex_limit.sim
#run query/complex_having.sim
#run query/udf.sim
#run query/complex_group.sim
#run query/interval.sim
#run query/session.sim
print ========> dead lock failed when 2 rows in outputCapacity
run query/scalarFunction.sim
run query/scalarNull.sim
run query/complex_where.sim
run tmq/basic1.sim
run tmq/basic4.sim
run tmq/basic1Of2Cons.sim
run tmq/prepareBasicEnv-1vgrp.sim
run tmq/topic.sim
run tmq/basic4Of2Cons.sim
run tmq/prepareBasicEnv-4vgrp.sim
run tmq/basic3.sim
run tmq/basic2Of2Cons.sim
run tmq/basic2.sim
run tmq/basic3Of2Cons.sim
run tmq/basic2Of2ConsOverlap.sim
run tmq/clearConsume.sim
run qnode/basic1.sim
run dnode/basic1.sim
run show/basic.sim
run insert/basic1.sim
run insert/basic0.sim
run insert/backquote.sim
run insert/null.sim
run sync/oneReplica1VgElectWithInsert.sim
run sync/threeReplica1VgElect.sim
run sync/oneReplica1VgElect.sim
run sync/insertDataByRunBack.sim
run sync/threeReplica1VgElectWihtInsert.sim
run sma/tsmaCreateInsertData.sim
run sma/rsmaCreateInsertQuery.sim
run valgrind/checkError.sim
run bnode/basic1.sim
run tsim/user/pass_alter.sim
run tsim/user/basic1.sim
run tsim/user/privilege2.sim
run tsim/user/user_len.sim
run tsim/user/privilege1.sim
run tsim/user/pass_len.sim
run tsim/table/basic1.sim
run tsim/trans/lossdata1.sim
run tsim/trans/create_db.sim
run tsim/stable/alter_metrics.sim
run tsim/stable/tag_modify.sim
run tsim/stable/alter_comment.sim
run tsim/stable/column_drop.sim
run tsim/stable/column_modify.sim
run tsim/stable/tag_rename.sim
run tsim/stable/vnode3.sim
run tsim/stable/metrics.sim
run tsim/stable/alter_insert2.sim
run tsim/stable/show.sim
run tsim/stable/alter_import.sim
run tsim/stable/tag_add.sim
run tsim/stable/tag_drop.sim
run tsim/stable/column_add.sim
run tsim/stable/alter_count.sim
run tsim/stable/values.sim
run tsim/stable/dnode3.sim
run tsim/stable/alter_insert1.sim
run tsim/stable/refcount.sim
run tsim/stable/disk.sim
run tsim/db/basic1.sim
run tsim/db/basic3.sim
run tsim/db/basic7.sim
run tsim/db/basic6.sim
run tsim/db/create_all_options.sim
run tsim/db/basic2.sim
run tsim/db/error1.sim
run tsim/db/taosdlog.sim
run tsim/db/alter_option.sim
run tsim/mnode/basic1.sim
run tsim/mnode/basic3.sim
run tsim/mnode/basic2.sim
run tsim/parser/fourArithmetic-basic.sim
run tsim/parser/groupby-basic.sim
run tsim/snode/basic1.sim
run tsim/query/time_process.sim
run tsim/query/stddev.sim
run tsim/query/interval-offset.sim
run tsim/query/charScalarFunction.sim
run tsim/query/complex_select.sim
run tsim/query/explain.sim
run tsim/query/crash_sql.sim
run tsim/query/diff.sim
run tsim/query/complex_limit.sim
run tsim/query/complex_having.sim
run tsim/query/udf.sim
run tsim/query/complex_group.sim
run tsim/query/interval.sim
run tsim/query/session.sim
run tsim/query/scalarFunction.sim
run tsim/query/scalarNull.sim
run tsim/query/complex_where.sim
run tsim/tmq/basic1.sim
run tsim/tmq/basic4.sim
run tsim/tmq/basic1Of2Cons.sim
run tsim/tmq/prepareBasicEnv-1vgrp.sim
run tsim/tmq/topic.sim
run tsim/tmq/basic4Of2Cons.sim
run tsim/tmq/prepareBasicEnv-4vgrp.sim
run tsim/tmq/basic3.sim
run tsim/tmq/basic2Of2Cons.sim
run tsim/tmq/basic2.sim
run tsim/tmq/basic3Of2Cons.sim
run tsim/tmq/basic2Of2ConsOverlap.sim
run tsim/tmq/clearConsume.sim
run tsim/qnode/basic1.sim
run tsim/dnode/basic1.sim
run tsim/show/basic.sim
run tsim/stream/basic1.sim
run tsim/stream/triggerInterval0.sim
run tsim/stream/triggerSession0.sim
run tsim/stream/basic0.sim
run tsim/stream/session0.sim
run tsim/stream/session1.sim
run tsim/stream/basic2.sim
run tsim/insert/basic1.sim
run tsim/insert/commit-merge0.sim
run tsim/insert/basic0.sim
run tsim/insert/update0.sim
run tsim/insert/backquote.sim
run tsim/insert/null.sim
run tsim/sync/oneReplica1VgElectWithInsert.sim
run tsim/sync/threeReplica1VgElect.sim
run tsim/sync/oneReplica1VgElect.sim
run tsim/sync/3Replica5VgElect.sim
run tsim/sync/insertDataByRunBack.sim
run tsim/sync/oneReplica5VgElect.sim
run tsim/sync/3Replica1VgElect.sim
run tsim/sync/threeReplica1VgElectWihtInsert.sim
run tsim/sma/tsmaCreateInsertData.sim
run tsim/sma/rsmaCreateInsertQuery.sim
run tsim/valgrind/basic.sim
run tsim/valgrind/checkError.sim
run tsim/bnode/basic1.sim
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录