Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ccc91cec
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ccc91cec
编写于
12月 31, 2021
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/qnode
上级
240a5309
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
573 addition
and
106 deletion
+573
-106
include/libs/qcom/query.h
include/libs/qcom/query.h
+1
-1
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+18
-4
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+18
-8
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+91
-44
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+415
-19
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+2
-2
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+15
-16
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+4
-3
source/util/src/thash.c
source/util/src/thash.c
+9
-9
未找到文件。
include/libs/qcom/query.h
浏览文件 @
ccc91cec
...
...
@@ -75,7 +75,7 @@ typedef struct STableMeta {
}
STableMeta
;
typedef
struct
SDBVgroupInfo
{
int32_t
lock
;
SRWLatch
lock
;
int32_t
vgVersion
;
int8_t
hashMethod
;
SHashObj
*
vgInfo
;
//key:vgId, value:SVgroupInfo
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
ccc91cec
...
...
@@ -50,23 +50,37 @@ typedef struct SQueryProfileSummary {
uint64_t
resultSize
;
// generated result size in Kb.
}
SQueryProfileSummary
;
typedef
struct
SQueryNodeAddr
{
int32_t
nodeId
;
//vgId or qnodeId
int8_t
inUse
;
int8_t
numOfEps
;
SEpAddrMsg
epAddr
[
TSDB_MAX_REPLICA
];
}
SQueryNodeAddr
;
typedef
struct
SQueryResult
{
int32_t
code
;
uint64_t
numOfRows
;
int32_t
msgSize
;
char
*
msg
;
}
SQueryResult
;
int32_t
schedulerInit
(
SSchedulerCfg
*
cfg
);
/**
* Process the query job, generated according to the query physical plan.
* This is a synchronized API, and is also thread-safety.
* @param
qnodeList Qnode address list, element is SEp
Addr
* @param
nodeList Qnode/Vnode address list, element is SQueryNode
Addr
* @return
*/
int32_t
scheduleExecJob
(
void
*
transport
,
SArray
*
qnodeList
,
SQueryDag
*
pDag
,
void
**
pJob
,
uint64_t
*
numOfRow
s
);
int32_t
scheduleExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
void
**
pJob
,
SQueryResult
*
pRe
s
);
/**
* Process the query job, generated according to the query physical plan.
* This is a asynchronized API, and is also thread-safety.
* @param
qnodeList Qnode address list, element is SEp
Addr
* @param
nodeList Qnode/Vnode address list, element is SQueryNode
Addr
* @return
*/
int32_t
scheduleAsyncExecJob
(
void
*
transport
,
SArray
*
q
nodeList
,
SQueryDag
*
pDag
,
void
**
pJob
);
int32_t
scheduleAsyncExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
void
**
pJob
);
int32_t
scheduleFetchRows
(
void
*
pJob
,
void
**
data
);
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
ccc91cec
...
...
@@ -77,27 +77,37 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define CTG_LOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
if ((*(_lock)) < 0) assert(0); \
assert(atomic_load_32((_lock)) >= 0); \
ctgDebug("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
ctgDebug("CTG RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
ctgDebug("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
if ((*(_lock)) < 0) assert(0); \
assert(atomic_load_32((_lock)) >= 0); \
ctgDebug("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
ctgDebug("CTG WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
ctgDebug("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
#define CTG_UNLOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
if ((*(_lock)) <= 0) assert(0); \
assert(atomic_load_32((_lock)) > 0); \
ctgDebug("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
ctgDebug("CTG RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
ctgDebug("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
if ((*(_lock)) <= 0) assert(0); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
ctgDebug("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
ctgDebug("CTG WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
ctgDebug("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
ccc91cec
...
...
@@ -23,21 +23,32 @@ SCatalogMgmt ctgMgmt = {0};
int32_t
ctgGetDBVgroupFromCache
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
**
dbInfo
,
bool
*
inCache
)
{
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
*
inCache
=
false
;
ctgWarn
(
"no db cache"
);
return
TSDB_CODE_SUCCESS
;
}
SDBVgroupInfo
*
info
=
taosHashAcquire
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
))
;
SDBVgroupInfo
*
info
=
NULL
;
if
(
NULL
==
info
)
{
*
inCache
=
false
;
return
TSDB_CODE_SUCCESS
;
}
while
(
true
)
{
info
=
taosHashAcquire
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
));
CTG_LOCK
(
CTG_READ
,
&
info
->
lock
);
if
(
NULL
==
info
->
vgInfo
)
{
CTG_UNLOCK
(
CTG_READ
,
&
info
->
lock
);
*
inCache
=
false
;
return
TSDB_CODE_SUCCESS
;
if
(
NULL
==
info
)
{
*
inCache
=
false
;
assert
(
0
);
ctgWarn
(
"no db cache, dbName:%s"
,
dbName
);
return
TSDB_CODE_SUCCESS
;
}
CTG_LOCK
(
CTG_READ
,
&
info
->
lock
);
if
(
NULL
==
info
->
vgInfo
)
{
CTG_UNLOCK
(
CTG_READ
,
&
info
->
lock
);
taosHashRelease
(
pCatalog
->
dbCache
.
cache
,
info
);
ctgWarn
(
"db cache vgInfo is NULL, dbName:%s"
,
dbName
);
continue
;
}
break
;
}
*
dbInfo
=
info
;
...
...
@@ -271,8 +282,6 @@ _return:
int32_t
ctgGetVgInfoFromHashValue
(
SDBVgroupInfo
*
dbInfo
,
const
SName
*
pTableName
,
SVgroupInfo
*
pVgroup
)
{
int32_t
code
=
0
;
CTG_LOCK
(
CTG_READ
,
&
dbInfo
->
lock
);
int32_t
vgNum
=
taosHashGetSize
(
dbInfo
->
vgInfo
);
char
db
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
pTableName
,
db
);
...
...
@@ -311,8 +320,6 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName
*
pVgroup
=
*
vgInfo
;
_return:
CTG_UNLOCK
(
CTG_READ
,
&
dbInfo
->
lock
);
CTG_RET
(
TSDB_CODE_SUCCESS
);
}
...
...
@@ -422,6 +429,8 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
if
(
0
==
forceUpdate
)
{
CTG_ERR_RET
(
ctgGetDBVgroupFromCache
(
pCatalog
,
dbName
,
dbInfo
,
&
inCache
));
assert
(
inCache
);
if
(
inCache
)
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -434,11 +443,47 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
input
.
db
[
sizeof
(
input
.
db
)
-
1
]
=
0
;
input
.
vgVersion
=
CTG_DEFAULT_INVALID_VERSION
;
CTG_ERR_RET
(
ctgGetDBVgroupFromMnode
(
pCatalog
,
pRpc
,
pMgmtEps
,
&
input
,
&
DbOut
));
while
(
true
)
{
CTG_ERR_RET
(
ctgGetDBVgroupFromMnode
(
pCatalog
,
pRpc
,
pMgmtEps
,
&
input
,
&
DbOut
));
CTG_ERR_RET
(
catalogUpdateDBVgroup
(
pCatalog
,
dbName
,
&
DbOut
.
dbVgroup
));
CTG_ERR_RET
(
catalogUpdateDBVgroup
(
pCatalog
,
dbName
,
&
DbOut
.
dbVgroup
));
CTG_ERR_RET
(
ctgGetDBVgroupFromCache
(
pCatalog
,
dbName
,
dbInfo
,
&
inCache
));
if
(
!
inCache
)
{
ctgWarn
(
"get db vgroup from cache failed, db:%s"
,
dbName
);
continue
;
}
CTG_ERR_RET
(
ctgGetDBVgroupFromCache
(
pCatalog
,
dbName
,
dbInfo
,
&
inCache
));
break
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgValidateAndRemoveDb
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
)
{
SDBVgroupInfo
*
oldInfo
=
(
SDBVgroupInfo
*
)
taosHashAcquire
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
));
if
(
oldInfo
)
{
CTG_LOCK
(
CTG_WRITE
,
&
oldInfo
->
lock
);
if
(
dbInfo
->
vgVersion
<=
oldInfo
->
vgVersion
)
{
ctgInfo
(
"dbName:%s vg will not update, vgVersion:%d , current:%d"
,
dbName
,
dbInfo
->
vgVersion
,
oldInfo
->
vgVersion
);
CTG_UNLOCK
(
CTG_WRITE
,
&
oldInfo
->
lock
);
taosHashRelease
(
pCatalog
->
dbCache
.
cache
,
oldInfo
);
return
TSDB_CODE_SUCCESS
;
}
if
(
oldInfo
->
vgInfo
)
{
ctgInfo
(
"dbName:%s vg will be cleanup"
,
dbName
);
taosHashCleanup
(
oldInfo
->
vgInfo
);
oldInfo
->
vgInfo
=
NULL
;
}
CTG_UNLOCK
(
CTG_WRITE
,
&
oldInfo
->
lock
);
taosHashRelease
(
pCatalog
->
dbCache
.
cache
,
oldInfo
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -581,55 +626,57 @@ _return:
int32_t
catalogUpdateDBVgroup
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
)
{
int32_t
code
=
0
;
if
(
NULL
==
pCatalog
||
NULL
==
dbName
||
NULL
==
dbInfo
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
if
(
NULL
==
dbInfo
->
vgInfo
||
dbInfo
->
vgVersion
<
0
||
taosHashGetSize
(
dbInfo
->
vgInfo
)
<=
0
)
{
ctgError
(
"invalid db vg, dbName:%s"
,
dbName
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
if
(
dbInfo
->
vgVersion
<
0
)
{
ctgWarn
(
"invalid db vgVersion:%d, dbName:%s"
,
dbInfo
->
vgVersion
,
dbName
);
if
(
pCatalog
->
dbCache
.
cache
)
{
SDBVgroupInfo
*
oldInfo
=
taosHashAcquire
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
));
if
(
oldInfo
)
{
CTG_LOCK
(
CTG_WRITE
,
&
oldInfo
->
lock
);
if
(
oldInfo
->
vgInfo
)
{
taosHashCleanup
(
oldInfo
->
vgInfo
);
oldInfo
->
vgInfo
=
NULL
;
}
CTG_UNLOCK
(
CTG_WRITE
,
&
oldInfo
->
lock
);
taosHashRelease
(
pCatalog
->
dbCache
.
cache
,
oldInfo
);
}
CTG_ERR_JRET
(
ctgValidateAndRemoveDb
(
pCatalog
,
dbName
,
dbInfo
));
CTG_ERR_JRET
(
taosHashRemove
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
)));
}
ctgWarn
(
"remove db [%s] from cache"
,
dbName
);
return
TSDB_CODE_SUCCESS
;
goto
_return
;
}
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
pCatalog
->
dbCache
.
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxDBCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
ctgError
(
"init hash[%d] for db cache failed"
,
CTG_DEFAULT_CACHE_DB_NUMBER
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
CTG_ERR_
J
RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
}
else
{
SDBVgroupInfo
*
oldInfo
=
taosHashAcquire
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
));
if
(
oldInfo
)
{
CTG_LOCK
(
CTG_WRITE
,
&
oldInfo
->
lock
);
if
(
oldInfo
->
vgInfo
)
{
taosHashCleanup
(
oldInfo
->
vgInfo
);
oldInfo
->
vgInfo
=
NULL
;
}
CTG_UNLOCK
(
CTG_WRITE
,
&
oldInfo
->
lock
);
taosHashRelease
(
pCatalog
->
dbCache
.
cache
,
oldInfo
);
}
CTG_ERR_JRET
(
ctgValidateAndRemoveDb
(
pCatalog
,
dbName
,
dbInfo
));
}
if
(
taosHashPut
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
),
dbInfo
,
sizeof
(
*
dbInfo
))
!=
0
)
{
ctgError
(
"push to vgroup hash cache failed"
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
CTG_ERR_
J
RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
return
TSDB_CODE_SUCCESS
;
ctgDebug
(
"dbName:%s vgroup updated, vgVersion:%d"
,
dbName
,
dbInfo
->
vgVersion
);
dbInfo
->
vgInfo
=
NULL
;
_return:
if
(
dbInfo
&&
dbInfo
->
vgInfo
)
{
taosHashCleanup
(
dbInfo
->
vgInfo
);
dbInfo
->
vgInfo
=
NULL
;
}
CTG_RET
(
code
);
}
int32_t
catalogGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
ccc91cec
...
...
@@ -36,11 +36,19 @@
namespace
{
extern
"C"
int32_t
ctgGetTableMetaFromCache
(
struct
SCatalog
*
pCatalog
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
*
exist
);
extern
"C"
int32_t
ctgUpdateTableMetaCache
(
struct
SCatalog
*
pCatalog
,
STableMetaOutput
*
output
);
void
ctgTestSetPrepareTableMeta
();
void
ctgTestSetPrepareCTableMeta
();
void
ctgTestSetPrepareSTableMeta
();
bool
ctgTestStop
=
false
;
bool
ctgTestEnableSleep
=
false
;
bool
ctgTestDeadLoop
=
true
;
int32_t
ctgTestCurrentVgVersion
=
0
;
int32_t
ctgTestVgVersion
=
1
;
int32_t
ctgTestVgNum
=
10
;
int32_t
ctgTestColNum
=
2
;
int32_t
ctgTestTagNum
=
1
;
...
...
@@ -89,6 +97,113 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
ASSERT_EQ
(
rpcRsp
.
code
,
0
);
}
void
ctgTestInitLogFile
()
{
const
char
*
defaultLogFileNamePrefix
=
"taoslog"
;
const
int32_t
maxLogFileNum
=
10
;
ctgDebugFlag
=
159
;
tsAsyncLog
=
0
;
char
temp
[
128
]
=
{
0
};
sprintf
(
temp
,
"%s/%s"
,
tsLogDir
,
defaultLogFileNamePrefix
);
if
(
taosInitLog
(
temp
,
tsNumOfLogLines
,
maxLogFileNum
)
<
0
)
{
printf
(
"failed to open log file in directory:%s
\n
"
,
tsLogDir
);
}
}
int32_t
ctgTestGetVgNumFromVgVersion
(
int32_t
vgVersion
)
{
return
((
vgVersion
%
2
)
==
0
)
?
ctgTestVgNum
-
2
:
ctgTestVgNum
;
}
void
ctgTestBuildCTableMetaOutput
(
STableMetaOutput
*
output
)
{
SName
cn
=
{.
type
=
TSDB_TABLE_NAME_T
,
.
acctId
=
1
};
strcpy
(
cn
.
dbname
,
"db1"
);
strcpy
(
cn
.
tname
,
ctgTestCTablename
);
SName
sn
=
{.
type
=
TSDB_TABLE_NAME_T
,
.
acctId
=
1
};
strcpy
(
sn
.
dbname
,
"db1"
);
strcpy
(
sn
.
tname
,
ctgTestSTablename
);
char
tbFullName
[
TSDB_TABLE_FNAME_LEN
];
tNameExtractFullName
(
&
cn
,
tbFullName
);
output
->
metaNum
=
2
;
strcpy
(
output
->
ctbFname
,
tbFullName
);
tNameExtractFullName
(
&
cn
,
tbFullName
);
strcpy
(
output
->
tbFname
,
tbFullName
);
output
->
ctbMeta
.
vgId
=
9
;
output
->
ctbMeta
.
tableType
=
TSDB_CHILD_TABLE
;
output
->
ctbMeta
.
uid
=
3
;
output
->
ctbMeta
.
suid
=
2
;
output
->
tbMeta
=
(
STableMeta
*
)
calloc
(
1
,
sizeof
(
STableMeta
)
+
sizeof
(
SSchema
)
*
(
ctgTestColNum
+
ctgTestColNum
));
output
->
tbMeta
->
vgId
=
9
;
output
->
tbMeta
->
tableType
=
TSDB_SUPER_TABLE
;
output
->
tbMeta
->
uid
=
2
;
output
->
tbMeta
->
suid
=
2
;
output
->
tbMeta
->
tableInfo
.
numOfColumns
=
ctgTestColNum
;
output
->
tbMeta
->
tableInfo
.
numOfTags
=
ctgTestTagNum
;
output
->
tbMeta
->
sversion
=
ctgTestSVersion
;
output
->
tbMeta
->
tversion
=
ctgTestTVersion
;
SSchema
*
s
=
NULL
;
s
=
&
output
->
tbMeta
->
schema
[
0
];
s
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
s
->
colId
=
1
;
s
->
bytes
=
8
;
strcpy
(
s
->
name
,
"ts"
);
s
=
&
output
->
tbMeta
->
schema
[
1
];
s
->
type
=
TSDB_DATA_TYPE_INT
;
s
->
colId
=
2
;
s
->
bytes
=
4
;
strcpy
(
s
->
name
,
"col1s"
);
s
=
&
output
->
tbMeta
->
schema
[
2
];
s
->
type
=
TSDB_DATA_TYPE_BINARY
;
s
->
colId
=
3
;
s
->
bytes
=
12
;
strcpy
(
s
->
name
,
"tag1s"
);
}
void
ctgTestBuildDBVgroup
(
SDBVgroupInfo
*
dbVgroup
)
{
static
int32_t
vgVersion
=
ctgTestVgVersion
+
1
;
int32_t
vgNum
=
0
;
SVgroupInfo
vgInfo
=
{
0
};
dbVgroup
->
vgVersion
=
vgVersion
++
;
ctgTestCurrentVgVersion
=
dbVgroup
->
vgVersion
;
dbVgroup
->
hashMethod
=
0
;
dbVgroup
->
vgInfo
=
taosHashInit
(
ctgTestVgNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
vgNum
=
ctgTestGetVgNumFromVgVersion
(
dbVgroup
->
vgVersion
);
uint32_t
hashUnit
=
UINT32_MAX
/
vgNum
;
for
(
int32_t
i
=
0
;
i
<
vgNum
;
++
i
)
{
vgInfo
.
vgId
=
i
+
1
;
vgInfo
.
hashBegin
=
i
*
hashUnit
;
vgInfo
.
hashEnd
=
hashUnit
*
(
i
+
1
)
-
1
;
vgInfo
.
numOfEps
=
i
%
TSDB_MAX_REPLICA
+
1
;
vgInfo
.
inUse
=
i
%
vgInfo
.
numOfEps
;
for
(
int32_t
n
=
0
;
n
<
vgInfo
.
numOfEps
;
++
n
)
{
SEpAddrMsg
*
addr
=
&
vgInfo
.
epAddr
[
n
];
strcpy
(
addr
->
fqdn
,
"a0"
);
addr
->
port
=
htons
(
n
+
22
);
}
taosHashPut
(
dbVgroup
->
vgInfo
,
&
vgInfo
.
vgId
,
sizeof
(
vgInfo
.
vgId
),
&
vgInfo
,
sizeof
(
vgInfo
));
}
}
void
ctgTestPrepareDbVgroups
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
SUseDbRsp
*
rspMsg
=
NULL
;
//todo
...
...
@@ -97,7 +212,8 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
pRsp
->
pCont
=
calloc
(
1
,
pRsp
->
contLen
);
rspMsg
=
(
SUseDbRsp
*
)
pRsp
->
pCont
;
strcpy
(
rspMsg
->
db
,
ctgTestDbname
);
rspMsg
->
vgVersion
=
htonl
(
1
);
rspMsg
->
vgVersion
=
htonl
(
ctgTestVgVersion
);
ctgTestCurrentVgVersion
=
ctgTestVgVersion
;
rspMsg
->
vgNum
=
htonl
(
ctgTestVgNum
);
rspMsg
->
hashMethod
=
0
;
...
...
@@ -148,13 +264,13 @@ void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
SSchema
*
s
=
NULL
;
s
=
&
rspMsg
->
pSchema
[
0
];
s
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
s
->
colId
=
htonl
(
0
);
s
->
colId
=
htonl
(
1
);
s
->
bytes
=
htonl
(
8
);
strcpy
(
s
->
name
,
"ts"
);
s
=
&
rspMsg
->
pSchema
[
1
];
s
->
type
=
TSDB_DATA_TYPE_INT
;
s
->
colId
=
htonl
(
1
);
s
->
colId
=
htonl
(
2
);
s
->
bytes
=
htonl
(
4
);
strcpy
(
s
->
name
,
"col1"
);
...
...
@@ -185,19 +301,19 @@ void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
SSchema
*
s
=
NULL
;
s
=
&
rspMsg
->
pSchema
[
0
];
s
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
s
->
colId
=
htonl
(
0
);
s
->
colId
=
htonl
(
1
);
s
->
bytes
=
htonl
(
8
);
strcpy
(
s
->
name
,
"ts"
);
s
=
&
rspMsg
->
pSchema
[
1
];
s
->
type
=
TSDB_DATA_TYPE_INT
;
s
->
colId
=
htonl
(
1
);
s
->
colId
=
htonl
(
2
);
s
->
bytes
=
htonl
(
4
);
strcpy
(
s
->
name
,
"col1s"
);
s
=
&
rspMsg
->
pSchema
[
2
];
s
->
type
=
TSDB_DATA_TYPE_BINARY
;
s
->
colId
=
htonl
(
2
);
s
->
colId
=
htonl
(
3
);
s
->
bytes
=
htonl
(
12
);
strcpy
(
s
->
name
,
"tag1s"
);
...
...
@@ -229,19 +345,19 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
SSchema
*
s
=
NULL
;
s
=
&
rspMsg
->
pSchema
[
0
];
s
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
s
->
colId
=
htonl
(
0
);
s
->
colId
=
htonl
(
1
);
s
->
bytes
=
htonl
(
8
);
strcpy
(
s
->
name
,
"ts"
);
s
=
&
rspMsg
->
pSchema
[
1
];
s
->
type
=
TSDB_DATA_TYPE_INT
;
s
->
colId
=
htonl
(
1
);
s
->
colId
=
htonl
(
2
);
s
->
bytes
=
htonl
(
4
);
strcpy
(
s
->
name
,
"col1s"
);
s
=
&
rspMsg
->
pSchema
[
2
];
s
->
type
=
TSDB_DATA_TYPE_BINARY
;
s
->
colId
=
htonl
(
2
);
s
->
colId
=
htonl
(
3
);
s
->
bytes
=
htonl
(
12
);
strcpy
(
s
->
name
,
"tag1s"
);
...
...
@@ -371,6 +487,117 @@ void ctgTestSetPrepareDbVgroupsAndSuperMeta() {
}
void
*
ctgTestGetDbVgroupThread
(
void
*
param
)
{
struct
SCatalog
*
pCtg
=
(
struct
SCatalog
*
)
param
;
int32_t
code
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SArray
*
vgList
=
NULL
;
int32_t
n
=
0
;
while
(
!
ctgTestStop
)
{
code
=
catalogGetDBVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
false
,
&
vgList
);
if
(
code
)
{
assert
(
0
);
}
if
(
vgList
)
{
taosArrayDestroy
(
vgList
);
}
if
(
ctgTestEnableSleep
)
{
usleep
(
rand
()
%
5
);
}
if
(
++
n
%
50000
==
0
)
{
printf
(
"Get:%d
\n
"
,
n
);
}
}
return
NULL
;
}
void
*
ctgTestSetDbVgroupThread
(
void
*
param
)
{
struct
SCatalog
*
pCtg
=
(
struct
SCatalog
*
)
param
;
int32_t
code
=
0
;
SDBVgroupInfo
dbVgroup
=
{
0
};
int32_t
n
=
0
;
while
(
!
ctgTestStop
)
{
ctgTestBuildDBVgroup
(
&
dbVgroup
);
code
=
catalogUpdateDBVgroup
(
pCtg
,
ctgTestDbname
,
&
dbVgroup
);
if
(
code
)
{
assert
(
0
);
}
if
(
ctgTestEnableSleep
)
{
usleep
(
rand
()
%
5
);
}
if
(
++
n
%
50000
==
0
)
{
printf
(
"Set:%d
\n
"
,
n
);
}
}
return
NULL
;
}
void
*
ctgTestGetCtableMetaThread
(
void
*
param
)
{
struct
SCatalog
*
pCtg
=
(
struct
SCatalog
*
)
param
;
int32_t
code
=
0
;
int32_t
n
=
0
;
STableMeta
*
tbMeta
=
NULL
;
int32_t
exist
=
0
;
SName
cn
=
{.
type
=
TSDB_TABLE_NAME_T
,
.
acctId
=
1
};
strcpy
(
cn
.
dbname
,
"db1"
);
strcpy
(
cn
.
tname
,
ctgTestCTablename
);
while
(
!
ctgTestStop
)
{
code
=
ctgGetTableMetaFromCache
(
pCtg
,
&
cn
,
&
tbMeta
,
&
exist
);
if
(
code
||
0
==
exist
)
{
assert
(
0
);
}
if
(
ctgTestEnableSleep
)
{
usleep
(
rand
()
%
5
);
}
if
(
++
n
%
50000
==
0
)
{
printf
(
"Get:%d
\n
"
,
n
);
}
}
return
NULL
;
}
void
*
ctgTestSetCtableMetaThread
(
void
*
param
)
{
struct
SCatalog
*
pCtg
=
(
struct
SCatalog
*
)
param
;
int32_t
code
=
0
;
SDBVgroupInfo
dbVgroup
=
{
0
};
int32_t
n
=
0
;
STableMetaOutput
output
=
{
0
};
ctgTestBuildCTableMetaOutput
(
&
output
);
while
(
!
ctgTestStop
)
{
code
=
ctgUpdateTableMetaCache
(
pCtg
,
&
output
);
if
(
code
)
{
assert
(
0
);
}
if
(
ctgTestEnableSleep
)
{
usleep
(
rand
()
%
5
);
}
if
(
++
n
%
50000
==
0
)
{
printf
(
"Set:%d
\n
"
,
n
);
}
}
return
NULL
;
}
#if 0
TEST(tableMeta, normalTable) {
struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1;
...
...
@@ -388,7 +615,7 @@ TEST(tableMeta, normalTable) {
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName
n
=
{.
type
=
T
_NAME_TABLE
,
.
acctId
=
1
};
SName n = {.type = T
SDB_TABLE_NAME_T
, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestTablename);
...
...
@@ -436,11 +663,13 @@ TEST(tableMeta, childTableCase) {
initQueryModuleMsgHandle();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
int32_t
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName
n
=
{.
type
=
T
_NAME_TABLE
,
.
acctId
=
1
};
SName n = {.type = T
SDB_TABLE_NAME_T
, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestCTablename);
...
...
@@ -494,11 +723,14 @@ TEST(tableMeta, superTableCase) {
initQueryModuleMsgHandle();
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName
n
=
{.
type
=
T
_NAME_TABLE
,
.
acctId
=
1
};
SName n = {.type = T
SDB_TABLE_NAME_T
, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestSTablename);
...
...
@@ -558,12 +790,15 @@ TEST(tableDistVgroup, normalTable) {
initQueryModuleMsgHandle();
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName
n
=
{.
type
=
T
_NAME_TABLE
,
.
acctId
=
1
};
SName n = {.type = T
SDB_TABLE_NAME_T
, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestTablename);
...
...
@@ -595,7 +830,7 @@ TEST(tableDistVgroup, childTableCase) {
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName
n
=
{.
type
=
T
_NAME_TABLE
,
.
acctId
=
1
};
SName n = {.type = T
SDB_TABLE_NAME_T
, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestCTablename);
...
...
@@ -620,11 +855,14 @@ TEST(tableDistVgroup, superTableCase) {
initQueryModuleMsgHandle();
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName
n
=
{.
type
=
T
_NAME_TABLE
,
.
acctId
=
1
};
SName n = {.type = T
SDB_TABLE_NAME_T
, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestSTablename);
...
...
@@ -645,6 +883,164 @@ TEST(tableDistVgroup, superTableCase) {
catalogDestroy();
}
TEST(dbVgroup, getSetDbVgroupCase) {
struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestSetPrepareDbVgroupsAndNormalMeta();
initQueryModuleMsgHandle();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestTablename);
code = catalogGetDBVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum);
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
ASSERT_EQ(code, 0);
ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.numOfEps, 3);
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
ASSERT_EQ(pvgInfo->vgId, 8);
ASSERT_EQ(pvgInfo->numOfEps, 3);
taosArrayDestroy(vgList);
ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup);
ASSERT_EQ(code, 0);
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
ASSERT_EQ(code, 0);
ASSERT_EQ(vgInfo.vgId, 7);
ASSERT_EQ(vgInfo.numOfEps, 2);
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
ASSERT_EQ(pvgInfo->vgId, 8);
ASSERT_EQ(pvgInfo->numOfEps, 3);
taosArrayDestroy(vgList);
catalogDestroy();
}
#endif
TEST
(
multiThread
,
getSetDbVgroupCase
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
vgInfo
=
{
0
};
SVgroupInfo
*
pvgInfo
=
NULL
;
SDBVgroupInfo
dbVgroup
=
{
0
};
SArray
*
vgList
=
NULL
;
ctgTestInitLogFile
();
ctgTestSetPrepareDbVgroups
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
SName
n
=
{.
type
=
TSDB_TABLE_NAME_T
,
.
acctId
=
1
};
strcpy
(
n
.
dbname
,
"db1"
);
strcpy
(
n
.
tname
,
ctgTestTablename
);
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_t
thread1
,
thread2
;
pthread_create
(
&
(
thread1
),
&
thattr
,
ctgTestSetDbVgroupThread
,
pCtg
);
sleep
(
1
);
pthread_create
(
&
(
thread1
),
&
thattr
,
ctgTestGetDbVgroupThread
,
pCtg
);
while
(
true
)
{
if
(
ctgTestDeadLoop
)
{
sleep
(
1
);
}
else
{
sleep
(
600
);
break
;
}
}
ctgTestStop
=
true
;
sleep
(
1
);
catalogDestroy
();
}
TEST
(
multiThread
,
ctableMeta
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
vgInfo
=
{
0
};
SVgroupInfo
*
pvgInfo
=
NULL
;
SDBVgroupInfo
dbVgroup
=
{
0
};
SArray
*
vgList
=
NULL
;
ctgTestSetPrepareDbVgroupsAndChildMeta
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
SName
n
=
{.
type
=
TSDB_TABLE_NAME_T
,
.
acctId
=
1
};
strcpy
(
n
.
dbname
,
"db1"
);
strcpy
(
n
.
tname
,
ctgTestTablename
);
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_t
thread1
,
thread2
;
pthread_create
(
&
(
thread1
),
&
thattr
,
ctgTestGetCtableMetaThread
,
pCtg
);
pthread_create
(
&
(
thread1
),
&
thattr
,
ctgTestSetCtableMetaThread
,
pCtg
);
while
(
true
)
{
if
(
ctgTestDeadLoop
)
{
sleep
(
1
);
}
else
{
sleep
(
600
);
break
;
}
}
ctgTestStop
=
true
;
sleep
(
1
);
catalogDestroy
();
}
int
main
(
int
argc
,
char
**
argv
)
{
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
ccc91cec
...
...
@@ -89,12 +89,12 @@ typedef struct SSchJob {
SEpSet
dataSrcEps
;
SEpAddr
resEp
;
void
*
transport
;
SArray
*
qnodeList
;
SArray
*
nodeList
;
// qnode/vnode list, element is SQueryNodeAddr
tsem_t
rspSem
;
int32_t
userFetch
;
int32_t
remoteFetch
;
SSchTask
*
fetchTask
;
SSchTask
*
fetchTask
;
int32_t
errCode
;
void
*
res
;
int32_t
resNumOfRows
;
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
ccc91cec
...
...
@@ -220,10 +220,10 @@ int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qnodeNum
=
taosArrayGetSize
(
job
->
q
nodeList
);
int32_t
nodeNum
=
taosArrayGetSize
(
job
->
nodeList
);
for
(
int32_t
i
=
0
;
i
<
q
nodeNum
&&
epSet
->
numOfEps
<
tListLen
(
epSet
->
port
);
++
i
)
{
SEpAddr
*
addr
=
taosArrayGet
(
job
->
q
nodeList
,
i
);
for
(
int32_t
i
=
0
;
i
<
nodeNum
&&
epSet
->
numOfEps
<
tListLen
(
epSet
->
port
);
++
i
)
{
SEpAddr
*
addr
=
taosArrayGet
(
job
->
nodeList
,
i
);
strncpy
(
epSet
->
fqdn
[
epSet
->
numOfEps
],
addr
->
fqdn
,
sizeof
(
addr
->
fqdn
));
epSet
->
port
[
epSet
->
numOfEps
]
=
addr
->
port
;
...
...
@@ -829,8 +829,8 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
}
int32_t
scheduleExecJobImpl
(
void
*
transport
,
SArray
*
q
nodeList
,
SQueryDag
*
pDag
,
void
**
pJob
,
bool
syncSchedule
)
{
if
(
qnodeList
&&
taosArrayGetSize
(
q
nodeList
)
<=
0
)
{
int32_t
scheduleExecJobImpl
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
void
**
pJob
,
bool
syncSchedule
)
{
if
(
nodeList
&&
taosArrayGetSize
(
nodeList
)
<=
0
)
{
qInfo
(
"qnodeList is empty"
);
}
...
...
@@ -842,7 +842,7 @@ int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag,
job
->
attr
.
syncSchedule
=
syncSchedule
;
job
->
transport
=
transport
;
job
->
qnodeList
=
q
nodeList
;
job
->
nodeList
=
nodeList
;
SCH_ERR_JRET
(
schValidateAndBuildJob
(
pDag
,
job
));
...
...
@@ -897,28 +897,27 @@ _return:
SCH_RET
(
code
);
}
int32_t
scheduleExecJob
(
void
*
transport
,
SArray
*
qnodeList
,
SQueryDag
*
pDag
,
void
**
pJob
,
uint64_t
*
numOfRow
s
)
{
if
(
NULL
==
transport
||
/* NULL ==
qnodeList || */
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
||
NULL
==
numOfRow
s
)
{
int32_t
scheduleExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
void
**
pJob
,
SQueryResult
*
pRe
s
)
{
if
(
NULL
==
transport
||
/* NULL ==
nodeList || */
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
||
NULL
==
pRe
s
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
*
numOfRows
=
0
;
SCH_ERR_RET
(
scheduleExecJobImpl
(
transport
,
qnodeList
,
pDag
,
pJob
,
true
));
SCH_ERR_RET
(
scheduleExecJobImpl
(
transport
,
nodeList
,
pDag
,
pJob
,
true
));
SSchJob
*
job
=
*
(
SSchJob
**
)
pJob
;
*
numOfRows
=
job
->
resNumOfRows
;
pRes
->
code
=
job
->
errCode
;
pRes
->
numOfRows
=
job
->
resNumOfRows
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
scheduleAsyncExecJob
(
void
*
transport
,
SArray
*
q
nodeList
,
SQueryDag
*
pDag
,
void
**
pJob
)
{
if
(
NULL
==
transport
||
NULL
==
q
nodeList
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
)
{
int32_t
scheduleAsyncExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
void
**
pJob
)
{
if
(
NULL
==
transport
||
NULL
==
nodeList
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
return
scheduleExecJobImpl
(
transport
,
q
nodeList
,
pDag
,
pJob
,
false
);
return
scheduleExecJobImpl
(
transport
,
nodeList
,
pDag
,
pJob
,
false
);
}
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
ccc91cec
...
...
@@ -321,10 +321,11 @@ TEST(insertTest, normalCase) {
pthread_t
thread1
;
pthread_create
(
&
(
thread1
),
&
thattr
,
schtSendRsp
,
&
pInsertJob
);
code
=
scheduleExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
pInsertJob
,
&
numOfRows
);
SQueryResult
res
=
{
0
};
code
=
scheduleExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
pInsertJob
,
&
res
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
numOfRows
,
20
);
ASSERT_EQ
(
res
.
numOfRows
,
20
);
scheduleFreeJob
(
pInsertJob
);
}
...
...
source/util/src/thash.c
浏览文件 @
ccc91cec
...
...
@@ -132,7 +132,7 @@ static FORCE_INLINE SHashNode *doUpdateHashNode(SHashObj *pHashObj, SHashEntry*
}
else
{
pNewNode
->
next
=
pNode
;
pe
->
num
++
;
atomic_add_fetch_
64
(
&
pHashObj
->
size
,
1
);
atomic_add_fetch_
32
(
&
pHashObj
->
size
,
1
);
}
return
pNewNode
;
...
...
@@ -209,7 +209,7 @@ int32_t taosHashGetSize(const SHashObj *pHashObj) {
if
(
!
pHashObj
)
{
return
0
;
}
return
(
int32_t
)
atomic_load_
64
(
&
pHashObj
->
size
);
return
(
int32_t
)
atomic_load_
32
(
&
pHashObj
->
size
);
}
static
FORCE_INLINE
bool
taosHashTableEmpty
(
const
SHashObj
*
pHashObj
)
{
...
...
@@ -273,7 +273,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
// enable resize
__rd_unlock
(
&
pHashObj
->
lock
,
pHashObj
->
type
);
atomic_add_fetch_
64
(
&
pHashObj
->
size
,
1
);
atomic_add_fetch_
32
(
&
pHashObj
->
size
,
1
);
return
0
;
}
else
{
...
...
@@ -405,7 +405,7 @@ void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, v
}
if
(
acquire
)
{
pNode
->
count
++
;
atomic_add_fetch_16
(
&
pNode
->
count
,
1
)
;
}
data
=
GET_HASH_NODE_DATA
(
pNode
);
...
...
@@ -482,7 +482,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi
// if (data) memcpy(data, GET_HASH_NODE_DATA(pNode), dsize);
pe
->
num
--
;
atomic_sub_fetch_
64
(
&
pHashObj
->
size
,
1
);
atomic_sub_fetch_
32
(
&
pHashObj
->
size
,
1
);
FREE_HASH_NODE
(
pHashObj
,
pNode
);
}
}
...
...
@@ -520,7 +520,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
while
((
pNode
=
pEntry
->
next
)
!=
NULL
)
{
if
(
fp
&&
(
!
fp
(
param
,
GET_HASH_NODE_DATA
(
pNode
))))
{
pEntry
->
num
-=
1
;
atomic_sub_fetch_
64
(
&
pHashObj
->
size
,
1
);
atomic_sub_fetch_
32
(
&
pHashObj
->
size
,
1
);
pEntry
->
next
=
pNode
->
next
;
...
...
@@ -546,7 +546,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
if
(
fp
&&
(
!
fp
(
param
,
GET_HASH_NODE_DATA
(
pNext
))))
{
pNode
->
next
=
pNext
->
next
;
pEntry
->
num
-=
1
;
atomic_sub_fetch_
64
(
&
pHashObj
->
size
,
1
);
atomic_sub_fetch_
32
(
&
pHashObj
->
size
,
1
);
if
(
pEntry
->
num
==
0
)
{
assert
(
pEntry
->
next
==
NULL
);
...
...
@@ -600,7 +600,7 @@ void taosHashClear(SHashObj *pHashObj) {
pEntry
->
next
=
NULL
;
}
pHashObj
->
size
=
0
;
atomic_store_32
(
&
pHashObj
->
size
,
0
)
;
__wr_unlock
(
&
pHashObj
->
lock
,
pHashObj
->
type
);
}
...
...
@@ -847,7 +847,7 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
}
pe
->
num
--
;
atomic_sub_fetch_
64
(
&
pHashObj
->
size
,
1
);
atomic_sub_fetch_
32
(
&
pHashObj
->
size
,
1
);
FREE_HASH_NODE
(
pHashObj
,
pOld
);
}
}
else
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录