Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bd2535f1
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
bd2535f1
编写于
12月 17, 2021
作者:
D
dapan1121
提交者:
GitHub
12月 17, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9150 from taosdata/catalog_dev
Catalog dev
上级
3b298478
aad78e65
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
391 addition
and
198 deletion
+391
-198
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+64
-20
include/libs/query/query.h
include/libs/query/query.h
+14
-0
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+10
-24
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+38
-23
source/libs/catalog/test/CMakeLists.txt
source/libs/catalog/test/CMakeLists.txt
+1
-1
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+83
-110
source/libs/query/inc/queryInt.h
source/libs/query/inc/queryInt.h
+0
-11
source/libs/query/src/querymsg.c
source/libs/query/src/querymsg.c
+1
-1
source/libs/scheduler/CMakeLists.txt
source/libs/scheduler/CMakeLists.txt
+1
-1
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+48
-6
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+129
-1
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/libs/catalog/catalog.h
浏览文件 @
bd2535f1
...
...
@@ -32,8 +32,7 @@ extern "C" {
struct
SCatalog
;
typedef
struct
SCatalogReq
{
char
dbName
[
TSDB_DB_NAME_LEN
];
SArray
*
pTableName
;
// table full name
SArray
*
pTableName
;
// element is SNAME
SArray
*
pUdf
;
// udf name
bool
qNodeRequired
;
// valid qnode
}
SCatalogReq
;
...
...
@@ -54,10 +53,10 @@ typedef struct SCatalogCfg {
int32_t
catalogInit
(
SCatalogCfg
*
cfg
);
/**
*
Catalog service object, which is utilized to hold tableMeta (meta/vgroupInfo/udfInfo) at the client-side.
*
There is ONLY one SCatalog object for one process space, and this function returns a singleton.
* @param c
lusterId
* @return
*
Get a cluster's catalog handle for all later operations.
*
@param clusterId (input, end with \0)
* @param c
atalogHandle (output, NO need to free it)
* @return
error code
*/
int32_t
catalogGetHandle
(
const
char
*
clusterId
,
struct
SCatalog
**
catalogHandle
);
...
...
@@ -65,29 +64,75 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
int32_t
catalogGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbName
,
int32_t
forceUpdate
,
SDBVgroupInfo
*
dbInfo
);
int32_t
catalogUpdateDBVgroupCache
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
);
/**
* Get a table's meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param pTableName (input, table name, NOT including db name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t
catalogGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
STableMeta
**
pTableMeta
);
/**
* Force renew a table's local cached meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param pTableName (input, table name, NOT including db name)
* @return error code
*/
int32_t
catalogRenewTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
);
/**
* Force renew a table's local cached meta data and get the new one.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param pTableName (input, table name, NOT including db name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t
catalogRenewAndGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
STableMeta
**
pTableMeta
);
/**
* get table's vgroup list.
* @param clusterId
* @pVgroupList - array of SVgroupInfo
* @return
* Get a table's actual vgroup, for stable it's all possible vgroup list.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param pTableName (input, table name, NOT including db name)
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
pVgroupList
);
/**
* Get a table's vgroup from its name's hash value.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param pTableName (input, table name, NOT including db name)
* @param vgInfo (output, vgroup info)
* @return error code
*/
int32_t
catalogGetTable
Vgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
pVgroupList
);
int32_t
catalogGetTable
HashVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SVgroupInfo
*
vgInfo
);
/**
* Get
the required meta data from mnode
.
*
Note that this is a synchronized API and is also thread-safety.
* @param p
Catalog
* @param pMgmtEps
* @param p
MetaReq
* @param p
MetaData
* @return
* Get
all meta data required in pReq
.
*
@param pCatalog (input, got with catalogGetHandle)
* @param p
Rpc (input, rpc object)
* @param pMgmtEps
(input, mnode EPs)
* @param p
Req (input, reqest info)
* @param p
Rsp (output, response data)
* @return
error code
*/
int32_t
catalogGetAllMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
);
...
...
@@ -98,7 +143,6 @@ int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, S
/**
* Destroy catalog and relase all resources
* @param pCatalog
*/
void
catalogDestroy
(
void
);
...
...
include/libs/query/query.h
浏览文件 @
bd2535f1
...
...
@@ -22,6 +22,7 @@ extern "C" {
#include "tarray.h"
#include "thash.h"
#include "tlog.h"
typedef
SVgroupListRspMsg
SVgroupListInfo
;
...
...
@@ -85,6 +86,19 @@ typedef struct STableMetaOutput {
extern
int32_t
(
*
queryBuildMsg
[
TSDB_MSG_TYPE_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
);
extern
int32_t
(
*
queryProcessMsgRsp
[
TSDB_MSG_TYPE_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
);
extern
void
msgInit
();
extern
int32_t
qDebugFlag
;
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#ifdef __cplusplus
}
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
bd2535f1
...
...
@@ -20,6 +20,12 @@
extern
"C"
{
#endif
#include "planner.h"
typedef
struct
SSchedulerCfg
{
}
SSchedulerCfg
;
typedef
struct
SQueryProfileSummary
{
int64_t
startTs
;
// Object created and added into the message queue
int64_t
endTs
;
// the timestamp when the task is completed
...
...
@@ -43,43 +49,23 @@ typedef struct SQueryProfileSummary {
uint64_t
resultSize
;
// generated result size in Kb.
}
SQueryProfileSummary
;
typedef
struct
SQueryTask
{
uint64_t
queryId
;
// query id
uint64_t
taskId
;
// task id
char
*
pSubplan
;
// operator tree
uint64_t
status
;
// task status
SQueryProfileSummary
summary
;
// task execution summary
void
*
pOutputHandle
;
// result buffer handle, to temporarily keep the output result for next stage
}
SQueryTask
;
typedef
struct
SQueryJob
{
SArray
**
pSubtasks
;
// todo
}
SQueryJob
;
/**
* Process the query job, generated according to the query physical plan.
* This is a synchronized API, and is also thread-safety.
* @param pJob
* @return
*/
int32_t
qProcessQueryJob
(
struct
SQueryJob
*
pJob
);
int32_t
scheduleQueryJob
(
SQueryDag
*
pDag
,
void
**
pJob
);
int32_t
scheduleFetchRows
(
void
*
pJob
,
void
*
data
);
/**
* The SSqlObj should not be here????
* @param pSql
* @param pVgroupId
* @param pRetVgroupId
* @return
*/
//SArray* qGetInvolvedVgroupIdList(struct SSqlObj* pSql, SArray* pVgroupId, SArray* pRetVgroupId);
/**
* Cancel query job
* @param pJob
* @return
*/
int32_t
qKillQueryJob
(
struct
SQueryJob
*
pJob
);
int32_t
scheduleCancelJob
(
void
*
pJob
);
#ifdef __cplusplus
}
...
...
include/util/taoserror.h
浏览文件 @
bd2535f1
...
...
@@ -324,6 +324,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica")
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070D) //"invalid time condition")
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070E) //"System error")
#define TSDB_CODE_QRY_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x070F) //"invalid input")
// grant
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
bd2535f1
...
...
@@ -54,15 +54,30 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
return
code
;
}
char
*
pMsg
=
rpcMallocCont
(
msgLen
);
if
(
NULL
==
pMsg
)
{
ctgError
(
"rpc malloc %d failed"
,
msgLen
);
tfree
(
msg
);
return
TSDB_CODE_CTG_MEM_ERROR
;
}
memcpy
(
pMsg
,
msg
,
msgLen
);
tfree
(
msg
);
SRpcMsg
rpcMsg
=
{
.
msgType
=
TSDB_MSG_TYPE_USE_DB
,
.
pCont
=
m
sg
,
.
pCont
=
pM
sg
,
.
contLen
=
msgLen
,
};
SRpcMsg
rpcRsp
=
{
0
};
rpcSendRecv
(
pRpc
,
(
SEpSet
*
)
pMgmtEps
,
&
rpcMsg
,
&
rpcRsp
);
if
(
TSDB_CODE_SUCCESS
!=
rpcRsp
.
code
)
{
ctgError
(
"error rsp for use db, code:%x"
,
rpcRsp
.
code
);
return
rpcRsp
.
code
;
}
code
=
queryProcessMsgRsp
[
TSDB_MSG_TYPE_USE_DB
](
out
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
);
if
(
code
)
{
...
...
@@ -169,9 +184,9 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
ctgGenEpSet
(
&
epSet
,
vgroupInfo
);
rpcSendRecv
(
pRpc
,
&
epSet
,
&
rpcMsg
,
&
rpcRsp
);
if
(
TSDB_CODE_SUCCESS
!=
rpcRsp
.
code
)
{
ctgError
(
"
get table meta from mnode failed, error code:%d
"
,
rpcRsp
.
code
);
ctgError
(
"
error rsp for table meta, code:%x
"
,
rpcRsp
.
code
);
return
rpcRsp
.
code
;
}
...
...
@@ -254,24 +269,6 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co
}
int32_t
ctgGetTableHashVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SVgroupInfo
*
pVgroup
)
{
SDBVgroupInfo
dbInfo
=
{
0
};
int32_t
code
=
0
;
int32_t
vgId
=
0
;
CTG_ERR_RET
(
catalogGetDBVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
false
,
&
dbInfo
));
if
(
dbInfo
.
vgVersion
<
0
||
NULL
==
dbInfo
.
vgInfo
)
{
ctgError
(
"db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p"
,
pDBName
,
dbInfo
.
vgVersion
,
dbInfo
.
vgInfo
);
return
TSDB_CODE_TSC_DB_NOT_SELECTED
;
}
CTG_ERR_RET
(
ctgGetVgInfoFromHashValue
(
&
dbInfo
,
pDBName
,
pTableName
,
pVgroup
));
return
code
;
}
STableMeta
*
ctgCreateSTableMeta
(
STableMetaMsg
*
pChild
)
{
assert
(
pChild
!=
NULL
);
...
...
@@ -554,7 +551,7 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
SVgroupInfo
vgroupInfo
=
{
0
};
CTG_ERR_RET
(
c
t
gGetTableHashVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
pTableName
,
&
vgroupInfo
));
CTG_ERR_RET
(
c
atalo
gGetTableHashVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
pTableName
,
&
vgroupInfo
));
STableMetaOutput
output
=
{
0
};
...
...
@@ -571,7 +568,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
return
ctgGetTableMetaImpl
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
pTableName
,
true
,
pTableMeta
);
}
int32_t
catalogGetTableVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
pVgroupList
)
{
int32_t
catalogGetTable
Dist
Vgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
pVgroupList
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pDBName
||
NULL
==
pTableName
||
NULL
==
pVgroupList
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
...
...
@@ -607,6 +604,24 @@ _return:
}
int32_t
catalogGetTableHashVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SVgroupInfo
*
pVgroup
)
{
SDBVgroupInfo
dbInfo
=
{
0
};
int32_t
code
=
0
;
int32_t
vgId
=
0
;
CTG_ERR_RET
(
catalogGetDBVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
false
,
&
dbInfo
));
if
(
dbInfo
.
vgVersion
<
0
||
NULL
==
dbInfo
.
vgInfo
)
{
ctgError
(
"db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p"
,
pDBName
,
dbInfo
.
vgVersion
,
dbInfo
.
vgInfo
);
return
TSDB_CODE_TSC_DB_NOT_SELECTED
;
}
CTG_ERR_RET
(
ctgGetVgInfoFromHashValue
(
&
dbInfo
,
pDBName
,
pTableName
,
pVgroup
));
return
code
;
}
int32_t
catalogGetAllMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pReq
||
NULL
==
pRsp
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
...
...
source/libs/catalog/test/CMakeLists.txt
浏览文件 @
bd2535f1
...
...
@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE
(
catalogTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
catalogTest
PUBLIC os util common catalog transport gtest query
PUBLIC os util common catalog transport gtest query
taos
)
TARGET_INCLUDE_DIRECTORIES
(
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
bd2535f1
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"
...
...
@@ -23,130 +23,103 @@
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
#include "taos.h"
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "catalog.h"
namespace
{
}
TEST
(
testCase
,
normalCase
)
{
char
*
clusterId
=
"cluster1"
;
struct
SCatalog
*
pCtg
=
NULL
;
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
clusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
}
/*
TEST(testCase, normalCase) {
SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)");
ASSERT_EQ(info1.valid, true);
char msg[128] = {0};
SMsgBuf buf;
buf.len = 128;
buf.buf = msg;
SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.sub.node), 0);
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
SQueryStmtInfo* pQueryInfo = createQueryInfo();
setTableMetaInfo(pQueryInfo, &req);
SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.sub.node, 0);
ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
ASSERT_EQ(ret, 0);
SArray* pExprList = pQueryInfo->exprList[0];
int32_t num = tsCompatibleModel? 2:1;
ASSERT_EQ(taosArrayGetSize(pExprList), num);
SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1);
ASSERT_EQ(p1->base.pColumns->uid, 110);
ASSERT_EQ(p1->base.numOfParams, 1);
ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)");
ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_TMP);
ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)");
ASSERT_EQ(p1->base.interBytes, 16);
ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE);
ASSERT_STREQ(p1->pExpr->_function.functionName, "top");
tExprNode* pParam = p1->pExpr->_function.pChild[0];
#include "tvariant.h"
#include "catalog.h"
#include "tep.h"
#include "trpc.h"
typedef
struct
SAppInstInfo
{
int64_t
numOfConns
;
SCorEpSet
mgmtEp
;
}
SAppInstInfo
;
typedef
struct
STscObj
{
char
user
[
TSDB_USER_LEN
];
char
pass
[
TSDB_PASSWORD_LEN
];
char
acctId
[
TSDB_ACCT_ID_LEN
];
char
db
[
TSDB_ACCT_ID_LEN
+
TSDB_DB_NAME_LEN
];
uint32_t
connId
;
uint64_t
id
;
// ref ID returned by taosAddRef
// struct SSqlObj *sqlList;
void
*
pTransporter
;
pthread_mutex_t
mutex
;
// used to protect the operation on db
int32_t
numOfReqs
;
// number of sqlObj from this tscObj
SAppInstInfo
*
pAppInfo
;
}
STscObj
;
ASSERT_EQ(pParam->nodeType, TEXPR_COL_NODE);
ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
struct SQueryPlanNode* n = nullptr;
code = createQueryPlan(pQueryInfo, &n);
namespace
{
char* str = NULL;
queryPlanToString(n, &str);
printf("%s\n", str);
void
sendCreateDbMsg
(
void
*
shandle
,
SEpSet
*
pEpSet
)
{
SCreateDbMsg
*
pReq
=
(
SCreateDbMsg
*
)
rpcMallocCont
(
sizeof
(
SCreateDbMsg
));
strcpy
(
pReq
->
db
,
"1.db1"
);
pReq
->
numOfVgroups
=
htonl
(
2
);
pReq
->
cacheBlockSize
=
htonl
(
16
);
pReq
->
totalBlocks
=
htonl
(
10
);
pReq
->
daysPerFile
=
htonl
(
10
);
pReq
->
daysToKeep0
=
htonl
(
3650
);
pReq
->
daysToKeep1
=
htonl
(
3650
);
pReq
->
daysToKeep2
=
htonl
(
3650
);
pReq
->
minRowsPerFileBlock
=
htonl
(
100
);
pReq
->
maxRowsPerFileBlock
=
htonl
(
4096
);
pReq
->
commitTime
=
htonl
(
3600
);
pReq
->
fsyncPeriod
=
htonl
(
3000
);
pReq
->
walLevel
=
1
;
pReq
->
precision
=
0
;
pReq
->
compression
=
2
;
pReq
->
replications
=
1
;
pReq
->
quorum
=
1
;
pReq
->
update
=
0
;
pReq
->
cacheLastRow
=
0
;
pReq
->
ignoreExist
=
1
;
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SCreateDbMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_CREATE_DB
;
SRpcMsg
rpcRsp
=
{
0
};
rpcSendRecv
(
shandle
,
pEpSet
,
&
rpcMsg
,
&
rpcRsp
);
ASSERT_EQ
(
rpcRsp
.
code
,
0
);
}
destroyQueryInfo(pQueryInfo);
qParserClearupMetaRequestInfo(&req);
destroySqlInfo(&info1);
}
TEST(testCase, displayPlan) {
generateLogicplan("select count(*) from `t.1abc`");
generateLogicplan("select count(*)+ 22 from `t.1abc`");
generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h, 20s) sliding(10m) limit 20,30");
generateLogicplan("select count(*) from `t.1abc` group by a");
generateLogicplan("select count(A+B) from `t.1abc` group by a");
generateLogicplan("select count(length(a)+b) from `t.1abc` group by a");
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s)");
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s) order by 1 desc ");
generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`");
generateLogicplan("select count(*), min(a) + 99 from `t.1abc`");
generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`");
generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc` limit 20");
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` state_window(a)");
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` session(ts, 20s)");
TEST
(
testCase
,
normalCase
)
{
STscObj
*
pConn
=
(
STscObj
*
)
taos_connect
(
"127.0.0.1"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
// order by + group by column + limit offset
generateLogicplan("select top(a, 20) k from `t.1abc` order by k asc limit 3 offset 1");
char
*
clusterId
=
"cluster1"
;
char
*
dbname
=
"1.db1"
;
char
*
tablename
=
"table1"
;
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
vgInfo
=
{
0
};
// fill
generateLogicplan("select min(a) from `t.1abc` where ts>now and ts<now+2h interval(1s) fill(linear)");
msgInit
();
// union + union all
sendCreateDbMsg
(
pConn
->
pTransporter
,
&
pConn
->
pAppInfo
->
mgmtEp
.
epSet
);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
clusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetTableHashVgroup
(
pCtg
,
pConn
->
pTransporter
,
&
pConn
->
pAppInfo
->
mgmtEp
.
epSet
,
dbname
,
tablename
,
&
vgInfo
);
ASSERT_EQ
(
code
,
0
);
// join
taos_close
(
pConn
);
}
// Aggregate(count(*) [count(*) #5056], sum(a) [sum(a) #5057], avg(b) [avg(b) #5058], min(a+b) [min(a+b) #5060])
// Projection(cols: [a+b #5059]) filters:(nil)
// Projection(cols: [ts #0], [a #1], [b #2]) filters:(nil)
// TableScan(t.1abc #110) time_range: -9223372036854775808 - 9223372036854775807
}
*/
int
main
(
int
argc
,
char
**
argv
)
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
source/libs/query/inc/queryInt.h
浏览文件 @
bd2535f1
...
...
@@ -21,17 +21,6 @@ extern "C" {
#endif
#include "tlog.h"
extern
int32_t
qDebugFlag
;
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#ifdef __cplusplus
}
...
...
source/libs/query/src/querymsg.c
浏览文件 @
bd2535f1
...
...
@@ -120,7 +120,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
pRsp
->
vgroupInfo
[
i
].
hashEnd
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
hashEnd
);
for
(
int32_t
n
=
0
;
n
<
pRsp
->
vgroupInfo
[
i
].
numOfEps
;
++
n
)
{
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
=
hton
l
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
=
hton
s
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
}
if
(
0
!=
taosHashPut
(
pOut
->
dbVgroup
.
vgInfo
,
&
pRsp
->
vgroupInfo
[
i
].
vgId
,
sizeof
(
pRsp
->
vgroupInfo
[
i
].
vgId
),
&
pRsp
->
vgroupInfo
[
i
],
sizeof
(
pRsp
->
vgroupInfo
[
i
])))
{
...
...
source/libs/scheduler/CMakeLists.txt
浏览文件 @
bd2535f1
...
...
@@ -9,5 +9,5 @@ target_include_directories(
target_link_libraries
(
scheduler
PRIVATE os util planner common
PRIVATE os util planner common
query
)
\ No newline at end of file
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
bd2535f1
...
...
@@ -24,15 +24,57 @@ extern "C" {
#include "tarray.h"
#include "planner.h"
#include "scheduler.h"
#include "thash.h"
#define SCHEDULE_DEFAULT_JOB_NUMBER 1000
enum
{
SCH_STATUS_NOT_START
=
1
,
SCH_STATUS_EXECUTING
,
SCH_STATUS_SUCCEED
,
SCH_STATUS_FAILED
,
SCH_STATUS_CANCELLING
,
SCH_STATUS_CANCELLED
};
typedef
struct
SSchedulerMgmt
{
SHashObj
*
Jobs
;
// key: queryId, value: SQueryJob*
}
SSchedulerMgmt
;
typedef
struct
SQueryTask
{
uint64_t
taskId
;
// task id
char
*
pSubplan
;
// operator tree
int8_t
status
;
// task status
SQueryProfileSummary
summary
;
// task execution summary
}
SQueryTask
;
typedef
struct
SQueryLevel
{
int8_t
status
;
int32_t
taskNum
;
SArray
*
subTasks
;
// Element is SQueryTask
SArray
*
subPlans
;
// Element is SSubplan
}
SQueryLevel
;
typedef
struct
SQueryJob
{
uint64_t
queryId
;
int32_t
levelNum
;
int32_t
levelIdx
;
int8_t
status
;
SQueryProfileSummary
summary
;
SArray
*
levels
;
// Element is SQueryLevel, starting from 0.
SArray
*
subPlans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
}
SQueryJob
;
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0)
#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0)
typedef
struct
SQuery
{
SArray
**
pSubquery
;
int32_t
numOfLevels
;
int32_t
currentLevel
;
}
SQuery
;
#ifdef __cplusplus
}
#endif
#endif
/*_TD_SCHEDULER_INT_H_*/
\ No newline at end of file
#endif
/*_TD_SCHEDULER_INT_H_*/
source/libs/scheduler/src/scheduler.c
浏览文件 @
bd2535f1
...
...
@@ -13,4 +13,132 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "schedulerInt.h"
\ No newline at end of file
#include "schedulerInt.h"
#include "taosmsg.h"
#include "query.h"
SSchedulerMgmt
schMgmt
=
{
0
};
int32_t
schBuildAndSendRequest
(
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
__taos_async_fn_t
fp
)
{
/*
SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT);
if (pRequest == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SRequestMsgBody body = {0};
buildConnectMsg(pRequest, &body);
int64_t transporterId = 0;
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
tsem_wait(&pRequest->body.rspSem);
destroyConnectMsg(&body);
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
printf("failed to connect to server, reason: %s\n\n", errorMsg);
destroyRequest(pRequest);
taos_close(pTscObj);
pTscObj = NULL;
} else {
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
destroyRequest(pRequest);
}
*/
}
int32_t
schValidateAndBuildJob
(
SQueryDag
*
dag
,
SQueryJob
*
job
)
{
int32_t
levelNum
=
(
int32_t
)
taosArrayGetSize
(
dag
->
pSubplans
);
if
(
levelNum
<=
0
)
{
qError
(
"invalid level num:%d"
,
levelNum
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
job
->
levels
=
taosArrayInit
(
levelNum
,
sizeof
(
SQueryLevel
));
if
(
NULL
==
job
->
levels
)
{
qError
(
"taosArrayInit %d failed"
,
levelNum
);
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
job
->
levelNum
=
levelNum
;
job
->
levelIdx
=
levelNum
-
1
;
job
->
status
=
SCH_STATUS_NOT_START
;
job
->
subPlans
=
dag
->
pSubplans
;
SQueryLevel
level
=
{
0
};
SArray
*
levelPlans
=
NULL
;
int32_t
levelPlanNum
=
0
;
for
(
int32_t
i
=
0
;
i
<
levelNum
;
++
i
)
{
levelPlans
=
taosArrayGetP
(
dag
->
pSubplans
,
i
);
if
(
NULL
==
levelPlans
)
{
qError
(
"no level plans for level %d"
,
i
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
levelPlanNum
=
(
int32_t
)
taosArrayGetSize
(
levelPlans
);
if
(
levelPlanNum
<=
0
)
{
qError
(
"invalid level plans number:%d, level:%d"
,
levelPlanNum
,
i
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
for
(
int32_t
n
=
0
;
n
<
levelPlanNum
;
++
n
)
{
}
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schedulerInit
(
SSchedulerCfg
*
cfg
)
{
schMgmt
.
Jobs
=
taosHashInit
(
SCHEDULE_DEFAULT_JOB_NUMBER
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
schMgmt
.
Jobs
)
{
SCH_ERR_LRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
,
"init %d schduler jobs failed"
,
SCHEDULE_DEFAULT_JOB_NUMBER
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
scheduleQueryJob
(
SQueryDag
*
pDag
,
void
**
pJob
)
{
if
(
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SQueryJob
*
job
=
calloc
(
1
,
sizeof
(
SQueryJob
));
if
(
NULL
==
job
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
schValidateAndBuildJob
(
pDag
,
job
);
*
(
SQueryJob
**
)
pJob
=
job
;
}
int32_t
scheduleFetchRows
(
void
*
pJob
,
void
*
data
);
int32_t
scheduleCancelJob
(
void
*
pJob
);
void
schedulerDestroy
(
void
)
{
if
(
schMgmt
.
Jobs
)
{
taosHashCleanup
(
schMgmt
.
Jobs
);
//TBD
schMgmt
.
Jobs
=
NULL
;
}
}
source/util/src/terror.c
浏览文件 @
bd2535f1
...
...
@@ -322,6 +322,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit ha
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_INCONSISTAN
,
"File inconsistance in replica"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_INVALID_TIME_CONDITION
,
"One valid time range condition expected"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_SYS_ERROR
,
"System error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_INVALID_INPUT
,
"invalid input"
)
// grant
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录