Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f17bf4bb
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f17bf4bb
编写于
1月 06, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/dnode3
上级
283b5341
84a1e6e7
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
141 addition
and
242 deletion
+141
-242
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+2
-17
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+0
-8
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+47
-128
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+5
-2
source/libs/index/inc/indexInt.h
source/libs/index/inc/indexInt.h
+1
-1
source/libs/index/inc/index_cache.h
source/libs/index/inc/index_cache.h
+2
-3
source/libs/index/src/index.c
source/libs/index/src/index.c
+1
-2
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+16
-15
source/libs/index/src/index_fst_counting_writer.c
source/libs/index/src/index_fst_counting_writer.c
+1
-2
source/libs/index/test/fstTest.cc
source/libs/index/test/fstTest.cc
+2
-2
source/libs/parser/src/dCDAstProcess.c
source/libs/parser/src/dCDAstProcess.c
+39
-25
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+4
-5
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+21
-32
未找到文件。
include/libs/catalog/catalog.h
浏览文件 @
f17bf4bb
...
...
@@ -50,7 +50,6 @@ typedef struct SCatalogCfg {
uint32_t
maxDBCacheNum
;
}
SCatalogCfg
;
int32_t
catalogInit
(
SCatalogCfg
*
cfg
);
/**
...
...
@@ -88,28 +87,15 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
*/
int32_t
catalogGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
);
/**
* Get a super table's meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t
catalogGetSTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
);
/**
* Force renew a table's local cached meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name)
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t
catalogRenewTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
int32_t
isSTable
);
int32_t
catalogRenewTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
);
/**
* Force renew a table's local cached meta data and get the new one.
...
...
@@ -118,10 +104,9 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, co
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t
catalogRenewAndGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
isSTable
);
int32_t
catalogRenewAndGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
);
/**
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
f17bf4bb
...
...
@@ -64,14 +64,6 @@ typedef struct SCatalogMgmt {
typedef
uint32_t
(
*
tableNameHashFp
)(
const
char
*
,
uint32_t
);
#define CTG_IS_STABLE(isSTable) (1 == (isSTable))
#define CTG_IS_NOT_STABLE(isSTable) (0 == (isSTable))
#define CTG_IS_UNKNOWN_STABLE(isSTable) ((isSTable) < 0)
#define CTG_SET_STABLE(isSTable, tbType) do { (isSTable) = ((tbType) == TSDB_SUPER_TABLE) ? 1 : ((tbType) > TSDB_SUPER_TABLE ? 0 : -1); } while (0)
#define CTG_TBTYPE_MATCH(isSTable, tbType) (CTG_IS_UNKNOWN_STABLE(isSTable) || (CTG_IS_STABLE(isSTable) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STABLE(isSTable) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_TABLE_NOT_EXIST(code) (code == TSDB_CODE_TDB_INVALID_TABLE_ID)
#define ctgFatal(...) do { if (ctgDebugFlag & DEBUG_FATAL) { taosPrintLog("CTG FATAL ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgError(...) do { if (ctgDebugFlag & DEBUG_ERROR) { taosPrintLog("CTG ERROR ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgWarn(...) do { if (ctgDebugFlag & DEBUG_WARN) { taosPrintLog("CTG WARN ", ctgDebugFlag, __VA_ARGS__); }} while(0)
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
f17bf4bb
...
...
@@ -105,8 +105,6 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
}
*
exist
=
1
;
tbMeta
=
*
pTableMeta
;
if
(
tbMeta
->
tableType
!=
TSDB_CHILD_TABLE
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -145,29 +143,6 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetTableTypeFromCache
(
struct
SCatalog
*
pCatalog
,
const
SName
*
pTableName
,
int32_t
*
tbType
)
{
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
return
TSDB_CODE_SUCCESS
;
}
char
tbFullName
[
TSDB_TABLE_FNAME_LEN
];
tNameExtractFullName
(
pTableName
,
tbFullName
);
size_t
sz
=
0
;
STableMeta
*
pTableMeta
=
NULL
;
taosHashGetCloneExt
(
pCatalog
->
tableCache
.
cache
,
tbFullName
,
strlen
(
tbFullName
),
NULL
,
(
void
**
)
&
pTableMeta
,
&
sz
);
if
(
NULL
==
pTableMeta
)
{
return
TSDB_CODE_SUCCESS
;
}
*
tbType
=
pTableMeta
->
tableType
;
return
TSDB_CODE_SUCCESS
;
}
void
ctgGenEpSet
(
SEpSet
*
epSet
,
SVgroupInfo
*
vgroupInfo
)
{
epSet
->
inUse
=
0
;
epSet
->
numOfEps
=
vgroupInfo
->
numOfEps
;
...
...
@@ -178,7 +153,14 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
}
}
int32_t
ctgGetTableMetaFromMnodeImpl
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
char
*
tbFullName
,
STableMetaOutput
*
output
)
{
int32_t
ctgGetTableMetaFromMnode
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMetaOutput
*
output
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
||
NULL
==
output
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
char
tbFullName
[
TSDB_TABLE_FNAME_LEN
];
tNameExtractFullName
(
pTableName
,
tbFullName
);
SBuildTableMetaInput
bInput
=
{.
vgId
=
0
,
.
dbName
=
NULL
,
.
tableFullName
=
tbFullName
};
char
*
msg
=
NULL
;
SEpSet
*
pVnodeEpSet
=
NULL
;
...
...
@@ -197,12 +179,6 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, cons
rpcSendRecv
(
pRpc
,
(
SEpSet
*
)
pMgmtEps
,
&
rpcMsg
,
&
rpcRsp
);
if
(
TSDB_CODE_SUCCESS
!=
rpcRsp
.
code
)
{
if
(
CTG_TABLE_NOT_EXIST
(
rpcRsp
.
code
))
{
output
->
metaNum
=
0
;
ctgDebug
(
"tbmeta:%s not exist in mnode"
,
tbFullName
);
return
TSDB_CODE_SUCCESS
;
}
ctgError
(
"error rsp for table meta, code:%x"
,
rpcRsp
.
code
);
CTG_ERR_RET
(
rpcRsp
.
code
);
}
...
...
@@ -212,13 +188,6 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, cons
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetTableMetaFromMnode
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMetaOutput
*
output
)
{
char
tbFullName
[
TSDB_TABLE_FNAME_LEN
];
tNameExtractFullName
(
pTableName
,
tbFullName
);
return
ctgGetTableMetaFromMnodeImpl
(
pCatalog
,
pRpc
,
pMgmtEps
,
tbFullName
,
output
);
}
int32_t
ctgGetTableMetaFromVnode
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
SVgroupInfo
*
vgroupInfo
,
STableMetaOutput
*
output
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
||
NULL
==
vgroupInfo
||
NULL
==
output
)
{
...
...
@@ -228,7 +197,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE
char
dbFullName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
pTableName
,
dbFullName
);
SBuildTableMetaInput
bInput
=
{.
vgId
=
vgroupInfo
->
vgId
,
.
dbName
=
dbFullName
,
.
tableFullName
=
(
char
*
)
pTableName
->
tname
};
SBuildTableMetaInput
bInput
=
{.
vgId
=
vgroupInfo
->
vgId
,
.
dbName
=
dbFullName
,
.
tableFullName
=
pTableName
->
tname
};
char
*
msg
=
NULL
;
SEpSet
*
pVnodeEpSet
=
NULL
;
int32_t
msgLen
=
0
;
...
...
@@ -248,12 +217,6 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE
rpcSendRecv
(
pRpc
,
&
epSet
,
&
rpcMsg
,
&
rpcRsp
);
if
(
TSDB_CODE_SUCCESS
!=
rpcRsp
.
code
)
{
if
(
CTG_TABLE_NOT_EXIST
(
rpcRsp
.
code
))
{
output
->
metaNum
=
0
;
ctgDebug
(
"tbmeta:%s not exist in vnode"
,
pTableName
->
tname
);
return
TSDB_CODE_SUCCESS
;
}
ctgError
(
"error rsp for table meta, code:%x"
,
rpcRsp
.
code
);
CTG_ERR_RET
(
rpcRsp
.
code
);
}
...
...
@@ -348,6 +311,14 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName
if
(
NULL
==
vgInfo
)
{
ctgError
(
"no hash range found for hashvalue[%u]"
,
hashValue
);
void
*
pIter1
=
taosHashIterate
(
dbInfo
->
vgInfo
,
NULL
);
while
(
pIter1
)
{
vgInfo
=
pIter1
;
ctgError
(
"valid range:[%d, %d], vgId:%d"
,
vgInfo
->
hashBegin
,
vgInfo
->
hashEnd
,
vgInfo
->
vgId
);
pIter1
=
taosHashIterate
(
dbInfo
->
vgInfo
,
pIter1
);
}
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
...
...
@@ -358,28 +329,22 @@ _return:
CTG_RET
(
TSDB_CODE_SUCCESS
);
}
int32_t
ctgGetTableMetaImpl
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
bool
forceUpdate
,
STableMeta
**
pTableMeta
,
int32_t
isSTable
)
{
int32_t
ctgGetTableMetaImpl
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
bool
forceUpdate
,
STableMeta
**
pTableMeta
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
||
NULL
==
pTableMeta
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
int32_t
exist
=
0
;
if
(
!
forceUpdate
)
{
if
(
!
forceUpdate
)
{
CTG_ERR_RET
(
ctgGetTableMetaFromCache
(
pCatalog
,
pTableName
,
pTableMeta
,
&
exist
));
if
(
exist
&&
CTG_TBTYPE_MATCH
(
isSTable
,
(
*
pTableMeta
)
->
tableType
)
)
{
if
(
exist
)
{
return
TSDB_CODE_SUCCESS
;
}
}
else
if
(
CTG_IS_UNKNOWN_STABLE
(
isSTable
))
{
int32_t
tbType
=
0
;
CTG_ERR_RET
(
ctgGetTableTypeFromCache
(
pCatalog
,
pTableName
,
&
tbType
));
CTG_SET_STABLE
(
isSTable
,
tbType
);
}
CTG_ERR_RET
(
c
tgRenewTableMetaImpl
(
pCatalog
,
pRpc
,
pMgmtEps
,
pTableName
,
isSTabl
e
));
CTG_ERR_RET
(
c
atalogRenewTableMeta
(
pCatalog
,
pRpc
,
pMgmtEps
,
pTableNam
e
));
CTG_ERR_RET
(
ctgGetTableMetaFromCache
(
pCatalog
,
pTableName
,
pTableMeta
,
&
exist
));
...
...
@@ -406,27 +371,19 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
}
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
SHashObj
*
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
cache
)
{
pCatalog
->
tableCache
.
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
ctgError
(
"init hash[%d] for tablemeta cache failed"
,
ctgMgmt
.
cfg
.
maxTblCacheNum
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
if
(
NULL
!=
atomic_val_compare_exchange_ptr
(
&
pCatalog
->
tableCache
.
cache
,
NULL
,
cache
))
{
taosHashCleanup
(
cache
);
}
}
if
(
NULL
==
pCatalog
->
tableCache
.
stableCache
)
{
SHashObj
*
c
ache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
c
ache
)
{
pCatalog
->
tableCache
.
stableC
ache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
tableCache
.
stableC
ache
)
{
ctgError
(
"init hash[%d] for stablemeta cache failed"
,
ctgMgmt
.
cfg
.
maxTblCacheNum
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
if
(
NULL
!=
atomic_val_compare_exchange_ptr
(
&
pCatalog
->
tableCache
.
stableCache
,
NULL
,
cache
))
{
taosHashCleanup
(
cache
);
}
}
if
(
output
->
metaNum
==
2
)
{
...
...
@@ -531,50 +488,6 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgRenewTableMetaImpl
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
int32_t
isSTable
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pTransporter
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
SVgroupInfo
vgroupInfo
=
{
0
};
int32_t
code
=
0
;
CTG_ERR_RET
(
catalogGetTableHashVgroup
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
&
vgroupInfo
));
STableMetaOutput
voutput
=
{
0
};
STableMetaOutput
moutput
=
{
0
};
STableMetaOutput
*
output
=
&
voutput
;
if
(
CTG_IS_STABLE
(
isSTable
))
{
CTG_ERR_JRET
(
ctgGetTableMetaFromMnode
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
&
moutput
));
if
(
0
==
moutput
.
metaNum
)
{
CTG_ERR_JRET
(
ctgGetTableMetaFromVnode
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
&
vgroupInfo
,
&
voutput
));
}
else
{
output
=
&
moutput
;
}
}
else
{
CTG_ERR_JRET
(
ctgGetTableMetaFromVnode
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
&
vgroupInfo
,
&
voutput
));
if
(
voutput
.
metaNum
>
0
&&
TSDB_SUPER_TABLE
==
voutput
.
tbMeta
->
tableType
)
{
CTG_ERR_JRET
(
ctgGetTableMetaFromMnodeImpl
(
pCatalog
,
pTransporter
,
pMgmtEps
,
voutput
.
tbFname
,
&
moutput
));
tfree
(
voutput
.
tbMeta
);
voutput
.
tbMeta
=
moutput
.
tbMeta
;
moutput
.
tbMeta
=
NULL
;
}
}
CTG_ERR_JRET
(
ctgUpdateTableMetaCache
(
pCatalog
,
output
));
_return:
tfree
(
voutput
.
tbMeta
);
tfree
(
moutput
.
tbMeta
);
CTG_RET
(
code
);
}
int32_t
catalogInit
(
SCatalogCfg
*
cfg
)
{
if
(
ctgMgmt
.
pCluster
)
{
...
...
@@ -737,15 +650,11 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
}
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
SHashObj
*
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxDBCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
cache
)
{
pCatalog
->
dbCache
.
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxDBCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
ctgError
(
"init hash[%d] for db cache failed"
,
CTG_DEFAULT_CACHE_DB_NUMBER
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
if
(
NULL
!=
atomic_val_compare_exchange_ptr
(
&
pCatalog
->
dbCache
.
cache
,
NULL
,
cache
))
{
taosHashCleanup
(
cache
);
}
}
else
{
CTG_ERR_JRET
(
ctgValidateAndRemoveDb
(
pCatalog
,
dbName
,
dbInfo
));
}
...
...
@@ -770,23 +679,34 @@ _return:
}
int32_t
catalogGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
return
ctgGetTableMetaImpl
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
false
,
pTableMeta
,
-
1
);
return
ctgGetTableMetaImpl
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
false
,
pTableMeta
);
}
int32_t
catalogGetSTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
return
ctgGetTableMetaImpl
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
false
,
pTableMeta
,
1
);
}
int32_t
catalogRenewTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
int32_t
isSTable
)
{
int32_t
catalogRenewTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pTransporter
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
return
ctgRenewTableMetaImpl
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
isSTable
);
SVgroupInfo
vgroupInfo
=
{
0
};
int32_t
code
=
0
;
CTG_ERR_RET
(
catalogGetTableHashVgroup
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
&
vgroupInfo
));
STableMetaOutput
output
=
{
0
};
CTG_ERR_RET
(
ctgGetTableMetaFromVnode
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
&
vgroupInfo
,
&
output
));
//CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
CTG_ERR_JRET
(
ctgUpdateTableMetaCache
(
pCatalog
,
&
output
));
_return:
tfree
(
output
.
tbMeta
);
CTG_RET
(
code
);
}
int32_t
catalogRenewAndGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
isSTable
)
{
return
ctgGetTableMetaImpl
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
true
,
pTableMeta
,
isSTable
);
int32_t
catalogRenewAndGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
return
ctgGetTableMetaImpl
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
true
,
pTableMeta
);
}
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
SArray
**
pVgroupList
)
{
...
...
@@ -861,7 +781,6 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter,
CTG_ERR_JRET
(
ctgGetVgInfoFromHashValue
(
dbInfo
,
pTableName
,
pVgroup
));
_return:
if
(
dbInfo
)
{
CTG_UNLOCK
(
CTG_READ
,
&
dbInfo
->
lock
);
taosHashRelease
(
pCatalog
->
dbCache
.
cache
,
dbInfo
);
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
f17bf4bb
...
...
@@ -45,7 +45,7 @@ void ctgTestSetPrepareSTableMeta();
bool
ctgTestStop
=
false
;
bool
ctgTestEnableSleep
=
false
;
bool
ctgTestDeadLoop
=
fals
e
;
bool
ctgTestDeadLoop
=
tru
e
;
int32_t
ctgTestCurrentVgVersion
=
0
;
int32_t
ctgTestVgVersion
=
1
;
...
...
@@ -600,6 +600,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
}
#if 0
TEST(tableMeta, normalTable) {
struct SCatalog* pCtg = NULL;
...
...
@@ -767,7 +768,7 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
tableMeta = NULL;
code
=
catalogRenewAndGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
&
n
,
&
tableMeta
,
0
);
code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
ASSERT_EQ(code, 0);
ASSERT_EQ(tableMeta->vgId, 9);
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
...
...
@@ -998,6 +999,8 @@ TEST(multiThread, getSetDbVgroupCase) {
catalogDestroy();
}
#endif
TEST
(
multiThread
,
ctableMeta
)
{
struct
SCatalog
*
pCtg
=
NULL
;
...
...
source/libs/index/inc/indexInt.h
浏览文件 @
f17bf4bb
...
...
@@ -115,7 +115,7 @@ typedef struct TFileCacheKey {
int32_t
nColName
;
}
ICacheKey
;
int
indexFlushCacheTFile
(
SIndex
*
sIdx
,
void
*
);
int
indexFlushCacheT
oT
File
(
SIndex
*
sIdx
,
void
*
);
int32_t
indexSerialCacheKey
(
ICacheKey
*
key
,
char
*
buf
);
...
...
source/libs/index/inc/index_cache.h
浏览文件 @
f17bf4bb
...
...
@@ -21,9 +21,8 @@
#include "tskiplist.h"
// ----------------- key structure in skiplist ---------------------
/* A data row, the format is like below:
* content: |<--totalLen-->|<-- value len--->|<-- value -->|<--uid -->|<--version--->|<-- itermType -->|
* len : |<--int32_t -->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->|
/* A data row, the format is like below
* content: |<---colVal---->|<-- version--->|<-- uid--->|<-- colType --->|<--operaType--->|
*/
#ifdef __cplusplus
...
...
source/libs/index/src/index.c
浏览文件 @
f17bf4bb
...
...
@@ -384,7 +384,6 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) {
}
}
else
{
taosArrayPush
(
result
,
&
tv
);
// indexError("merge colVal: %s", tv->colVal);
}
}
static
void
indexDestroyTempResult
(
SArray
*
result
)
{
...
...
@@ -395,7 +394,7 @@ static void indexDestroyTempResult(SArray* result) {
}
taosArrayDestroy
(
result
);
}
int
indexFlushCacheTFile
(
SIndex
*
sIdx
,
void
*
cache
)
{
int
indexFlushCacheT
oT
File
(
SIndex
*
sIdx
,
void
*
cache
)
{
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
indexInfo
(
"suid %"
PRIu64
" merge cache into tindex"
,
sIdx
->
suid
);
...
...
source/libs/index/src/index_cache.c
浏览文件 @
f17bf4bb
...
...
@@ -27,9 +27,9 @@
static
void
indexMemRef
(
MemTable
*
tbl
);
static
void
indexMemUnRef
(
MemTable
*
tbl
);
static
void
c
acheTermDestroy
(
CacheTerm
*
ct
);
static
char
*
getIndexKey
(
const
void
*
pData
);
static
int32_t
compareKey
(
const
void
*
l
,
const
void
*
r
);
static
void
indexC
acheTermDestroy
(
CacheTerm
*
ct
);
static
int32_t
indexCacheTermCompare
(
const
void
*
l
,
const
void
*
r
);
static
char
*
indexCacheTermGet
(
const
void
*
pData
);
static
MemTable
*
indexInternalCacheCreate
(
int8_t
type
);
...
...
@@ -45,9 +45,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
return
NULL
;
};
cache
->
mem
=
indexInternalCacheCreate
(
type
);
cache
->
colName
=
calloc
(
1
,
strlen
(
colName
)
+
1
);
memcpy
(
cache
->
colName
,
colName
,
strlen
(
colName
));
cache
->
colName
=
tstrdup
(
colName
);
cache
->
type
=
type
;
cache
->
index
=
idx
;
cache
->
version
=
0
;
...
...
@@ -187,8 +185,8 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
break
;
}
else
if
(
cache
->
imm
!=
NULL
)
{
// TODO: wake up by condition variable
// pthread_mutex_unlock(&cache->mtx);
pthread_cond_wait
(
&
cache
->
finished
,
&
cache
->
mtx
);
// pthread_mutex_unlock(&cache->mtx);
// taosMsleep(50);
// pthread_mutex_lock(&cache->mtx);
}
else
{
...
...
@@ -243,7 +241,7 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
static
int
indexQueryMem
(
MemTable
*
mem
,
CacheTerm
*
ct
,
EIndexQueryType
qtype
,
SArray
*
result
,
STermValueType
*
s
)
{
if
(
mem
==
NULL
)
{
return
0
;
}
char
*
key
=
getIndexKey
(
ct
);
char
*
key
=
indexCacheTermGet
(
ct
);
SSkipListIterator
*
iter
=
tSkipListCreateIterFromVal
(
mem
->
mem
,
key
,
TSDB_DATA_TYPE_BINARY
,
TSDB_ORDER_ASC
);
while
(
tSkipListIterNext
(
iter
))
{
...
...
@@ -321,17 +319,16 @@ void indexMemUnRef(MemTable* tbl) {
}
}
static
void
c
acheTermDestroy
(
CacheTerm
*
ct
)
{
static
void
indexC
acheTermDestroy
(
CacheTerm
*
ct
)
{
if
(
ct
==
NULL
)
{
return
;
}
free
(
ct
->
colVal
);
free
(
ct
);
}
static
char
*
getIndexKey
(
const
void
*
pData
)
{
static
char
*
indexCacheTermGet
(
const
void
*
pData
)
{
CacheTerm
*
p
=
(
CacheTerm
*
)
pData
;
return
(
char
*
)
p
;
}
static
int32_t
compareKey
(
const
void
*
l
,
const
void
*
r
)
{
static
int32_t
indexCacheTermCompare
(
const
void
*
l
,
const
void
*
r
)
{
CacheTerm
*
lt
=
(
CacheTerm
*
)
l
;
CacheTerm
*
rt
=
(
CacheTerm
*
)
r
;
...
...
@@ -345,7 +342,8 @@ static MemTable* indexInternalCacheCreate(int8_t type) {
MemTable
*
tbl
=
calloc
(
1
,
sizeof
(
MemTable
));
indexMemRef
(
tbl
);
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
tbl
->
mem
=
tSkipListCreate
(
MAX_SKIP_LIST_LEVEL
,
type
,
MAX_INDEX_KEY_LEN
,
compareKey
,
SL_ALLOW_DUP_KEY
,
getIndexKey
);
tbl
->
mem
=
tSkipListCreate
(
MAX_SKIP_LIST_LEVEL
,
type
,
MAX_INDEX_KEY_LEN
,
indexCacheTermCompare
,
SL_ALLOW_DUP_KEY
,
indexCacheTermGet
);
}
return
tbl
;
}
...
...
@@ -353,7 +351,7 @@ static MemTable* indexInternalCacheCreate(int8_t type) {
static
void
doMergeWork
(
SSchedMsg
*
msg
)
{
IndexCache
*
pCache
=
msg
->
ahandle
;
SIndex
*
sidx
=
(
SIndex
*
)
pCache
->
index
;
indexFlushCacheTFile
(
sidx
,
pCache
);
indexFlushCacheT
oT
File
(
sidx
,
pCache
);
}
static
bool
indexCacheIteratorNext
(
Iterate
*
itera
)
{
SSkipListIterator
*
iter
=
itera
->
iter
;
...
...
@@ -375,4 +373,7 @@ static bool indexCacheIteratorNext(Iterate* itera) {
return
next
;
}
static
IterateValue
*
indexCacheIteratorGetValue
(
Iterate
*
iter
)
{
return
&
iter
->
val
;
}
static
IterateValue
*
indexCacheIteratorGetValue
(
Iterate
*
iter
)
{
// opt later
return
&
iter
->
val
;
}
source/libs/index/src/index_fst_counting_writer.c
浏览文件 @
f17bf4bb
...
...
@@ -18,8 +18,6 @@
#include "tutil.h"
static
int
writeCtxDoWrite
(
WriterCtx
*
ctx
,
uint8_t
*
buf
,
int
len
)
{
// if (ctx->offset + len > ctx->limit) { return -1; }
if
(
ctx
->
type
==
TFile
)
{
assert
(
len
==
tfWrite
(
ctx
->
file
.
fd
,
buf
,
len
));
}
else
{
...
...
@@ -125,6 +123,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
if
(
ctx
->
type
==
TMemory
)
{
free
(
ctx
->
mem
.
buf
);
}
else
{
ctx
->
flush
(
ctx
);
tfClose
(
ctx
->
file
.
fd
);
if
(
ctx
->
file
.
readOnly
)
{
#ifdef USE_MMAP
...
...
source/libs/index/test/fstTest.cc
浏览文件 @
f17bf4bb
...
...
@@ -48,7 +48,7 @@ class FstWriter {
class
FstReadMemory
{
public:
FstReadMemory
(
size_t
size
,
const
std
::
string
&
fileName
=
fileName
)
{
FstReadMemory
(
size_t
size
,
const
std
::
string
&
fileName
=
"/tmp/tindex.tindex"
)
{
tfInit
();
_wc
=
writerCtxCreate
(
TFile
,
fileName
.
c_str
(),
true
,
64
*
1024
);
_w
=
fstCountingWriterCreate
(
_wc
);
...
...
@@ -307,7 +307,7 @@ void validateTFile(char* arg) {
tfCleanup
();
}
int
main
(
int
argc
,
char
*
argv
[])
{
// tool to check all kind of fst test
// tool to check all kind of fst test
// if (argc > 1) { validateTFile(argv[1]); }
// checkFstCheckIterator();
// checkFstLongTerm();
...
...
source/libs/parser/src/dCDAstProcess.c
浏览文件 @
f17bf4bb
...
...
@@ -339,7 +339,6 @@ static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputT
code
=
parseValueToken
(
&
endPtr
,
pItem
,
pSchema
,
tsPrecision
,
tmpTokenBuf
,
KvRowAppend
,
&
param
,
pMsgBuf
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tdDestroyKVRowBuilder
(
pKvRowBuilder
);
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg1
);
}
}
...
...
@@ -393,6 +392,9 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
const
char
*
msg3
=
"tag value too long"
;
const
char
*
msg4
=
"illegal value or data overflow"
;
int32_t
code
=
0
;
STableMeta
*
pSuperTableMeta
=
NULL
;
SHashObj
*
pVgroupHashmap
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
// super table name, create table by using dst
...
...
@@ -401,29 +403,30 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
SCreatedTableInfo
*
pCreateTableInfo
=
taosArrayGet
(
pCreateTable
->
childTableInfo
,
j
);
SToken
*
pSTableNameToken
=
&
pCreateTableInfo
->
stbName
;
int32_t
code
=
parserValidateNameToken
(
pSTableNameToken
);
code
=
parserValidateNameToken
(
pSTableNameToken
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg1
);
code
=
buildInvalidOperationMsg
(
pMsgBuf
,
msg1
);
goto
_error
;
}
SName
name
=
{
0
};
code
=
createSName
(
&
name
,
pSTableNameToken
,
pCtx
,
pMsgBuf
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
goto
_error
;
}
SKVRowBuilder
kvRowBuilder
=
{
0
};
if
(
tdInitKVRowBuilder
(
&
kvRowBuilder
)
<
0
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
}
SArray
*
pValList
=
pCreateTableInfo
->
pTagVals
;
size_t
numOfInputTag
=
taosArrayGetSize
(
pValList
);
STableMeta
*
pSuperTableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtx
->
pCatalog
,
pCtx
->
pTransporter
,
&
pCtx
->
mgmtEpSet
,
&
name
,
&
pSuperTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
goto
_error
;
}
assert
(
pSuperTableMeta
!=
NULL
);
...
...
@@ -442,8 +445,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
if
(
numOfInputTag
!=
numOfBoundTags
||
schemaSize
<
numOfInputTag
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
tfree
(
pSuperTableMeta
);
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg2
)
;
code
=
buildInvalidOperationMsg
(
pMsgBuf
,
msg2
);
goto
_error
;
}
bool
findColumnIndex
=
false
;
...
...
@@ -475,8 +478,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
if
(
pSchema
->
type
==
TSDB_DATA_TYPE_BINARY
||
pSchema
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
if
(
pItem
->
pVar
.
nLen
>
pSchema
->
bytes
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
tfree
(
pSuperTableMeta
);
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg3
)
;
code
=
buildInvalidOperationMsg
(
pMsgBuf
,
msg3
);
goto
_error
;
}
}
else
if
(
pSchema
->
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
if
(
pItem
->
pVar
.
nType
==
TSDB_DATA_TYPE_BINARY
)
{
...
...
@@ -492,19 +495,19 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
code
=
taosVariantDump
(
&
(
pItem
->
pVar
),
tagVal
,
pSchema
->
type
,
true
);
// check again after the convert since it may be converted from binary to nchar.
if
(
pSchema
->
type
==
TSDB_DATA_TYPE_BINARY
||
pSchema
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
if
(
IS_VAR_DATA_TYPE
(
pSchema
->
type
)
)
{
int16_t
len
=
varDataTLen
(
tagVal
);
if
(
len
>
pSchema
->
bytes
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
tfree
(
pSuperTableMeta
);
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg3
)
;
code
=
buildInvalidOperationMsg
(
pMsgBuf
,
msg3
);
goto
_error
;
}
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
tfree
(
pSuperTableMeta
);
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg4
)
;
code
=
buildInvalidOperationMsg
(
pMsgBuf
,
msg4
);
goto
_error
;
}
tdAddColToKVRow
(
&
kvRowBuilder
,
pSchema
->
colId
,
pSchema
->
type
,
tagVal
);
...
...
@@ -522,23 +525,22 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
}
else
{
if
(
schemaSize
!=
numOfInputTag
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
tfree
(
pSuperTableMeta
);
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg2
)
;
code
=
buildInvalidOperationMsg
(
pMsgBuf
,
msg2
);
goto
_error
;
}
code
=
doParseSerializeTagValue
(
pTagSchema
,
numOfInputTag
,
&
kvRowBuilder
,
pValList
,
tinfo
.
precision
,
pMsgBuf
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
tfree
(
pSuperTableMeta
);
return
code
;
goto
_error
;
}
}
SKVRow
row
=
tdGetKVRowFromBuilder
(
&
kvRowBuilder
);
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
if
(
row
==
NULL
)
{
tfree
(
pSuperTableMeta
)
;
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_error
;
}
tdSortKVRowByColIdx
(
row
);
...
...
@@ -546,22 +548,34 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
SName
tableName
=
{
0
};
code
=
createSName
(
&
tableName
,
&
pCreateTableInfo
->
name
,
pCtx
,
pMsgBuf
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tfree
(
pSuperTableMeta
);
return
code
;
goto
_error
;
}
// Find a appropriate vgroup to accommodate this table , according to the table name
SVgroupInfo
info
=
{
0
};
catalogGetTableHashVgroup
(
pCtx
->
pCatalog
,
pCtx
->
pTransporter
,
&
pCtx
->
mgmtEpSet
,
&
tableName
,
&
info
);
code
=
catalogGetTableHashVgroup
(
pCtx
->
pCatalog
,
pCtx
->
pTransporter
,
&
pCtx
->
mgmtEpSet
,
&
tableName
,
&
info
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
addCreateTbReqIntoVgroup
(
pVgroupHashmap
,
&
tableName
,
row
,
pSuperTableMeta
->
uid
,
&
info
);
tfree
(
pSuperTableMeta
);
}
*
pBufArray
=
doSerializeVgroupCreateTableInfo
(
pVgroupHashmap
);
if
(
*
pBufArray
==
NULL
)
{
code
=
terrno
;
goto
_error
;
}
taosHashCleanup
(
pVgroupHashmap
);
return
TSDB_CODE_SUCCESS
;
_error:
taosHashCleanup
(
pVgroupHashmap
);
tfree
(
pSuperTableMeta
);
terrno
=
code
;
return
code
;
}
static
int32_t
serializeVgroupTablesBatchImpl
(
SVgroupTablesBatch
*
pTbBatch
,
SArray
*
pBufArray
)
{
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
f17bf4bb
...
...
@@ -67,8 +67,8 @@ typedef struct SSchTask {
int32_t
msgLen
;
// msg length
int8_t
status
;
// task status
SQueryNodeAddr
execAddr
;
// task actual executed node address
int8_t
c
o
ndidateIdx
;
// current try condidation index
SArray
*
c
o
ndidateAddrs
;
// condidate node addresses, element is SQueryNodeAddr
int8_t
c
a
ndidateIdx
;
// current try condidation index
SArray
*
c
a
ndidateAddrs
;
// condidate node addresses, element is SQueryNodeAddr
SQueryProfileSummary
summary
;
// task execution summary
int32_t
childReady
;
// child task ready number
SArray
*
children
;
// the datasource tasks,from which to fetch the result, element is SQueryTask*
...
...
@@ -115,9 +115,8 @@ typedef struct SSchJob {
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_JOB_ELOG(param, ...) qError("QID:% "PRIx64 param, job->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
f17bf4bb
...
...
@@ -109,7 +109,7 @@ static SSchTask initTask(SSchJob* pJob, SSubplan* plan, SSchLevel *pLevel) {
}
static
void
cleanupTask
(
SSchTask
*
pTask
)
{
taosArrayDestroy
(
pTask
->
c
o
ndidateAddrs
);
taosArrayDestroy
(
pTask
->
c
a
ndidateAddrs
);
}
int32_t
schValidateAndBuildJob
(
SQueryDag
*
dag
,
SSchJob
*
pJob
)
{
...
...
@@ -226,20 +226,20 @@ _return:
SCH_RET
(
code
);
}
int32_t
schSetTaskC
o
ndidateAddrs
(
SSchJob
*
job
,
SSchTask
*
task
)
{
if
(
task
->
c
o
ndidateAddrs
)
{
int32_t
schSetTaskC
a
ndidateAddrs
(
SSchJob
*
job
,
SSchTask
*
task
)
{
if
(
task
->
c
a
ndidateAddrs
)
{
return
TSDB_CODE_SUCCESS
;
}
task
->
c
o
ndidateIdx
=
0
;
task
->
c
o
ndidateAddrs
=
taosArrayInit
(
SCH_MAX_CONDIDATE_EP_NUM
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
task
->
c
o
ndidateAddrs
)
{
task
->
c
a
ndidateIdx
=
0
;
task
->
c
a
ndidateAddrs
=
taosArrayInit
(
SCH_MAX_CONDIDATE_EP_NUM
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
task
->
c
a
ndidateAddrs
)
{
qError
(
"taosArrayInit failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
task
->
plan
->
execNode
.
numOfEps
>
0
)
{
if
(
NULL
==
taosArrayPush
(
task
->
c
o
ndidateAddrs
,
&
task
->
plan
->
execNode
))
{
if
(
NULL
==
taosArrayPush
(
task
->
c
a
ndidateAddrs
,
&
task
->
plan
->
execNode
))
{
qError
(
"taosArrayPush failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -253,7 +253,7 @@ int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) {
for
(
int32_t
i
=
0
;
i
<
nodeNum
&&
addNum
<
SCH_MAX_CONDIDATE_EP_NUM
;
++
i
)
{
SQueryNodeAddr
*
naddr
=
taosArrayGet
(
job
->
nodeList
,
i
);
if
(
NULL
==
taosArrayPush
(
task
->
c
o
ndidateAddrs
,
&
task
->
plan
->
execNode
))
{
if
(
NULL
==
taosArrayPush
(
task
->
c
a
ndidateAddrs
,
&
task
->
plan
->
execNode
))
{
qError
(
"taosArrayPush failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -280,8 +280,6 @@ int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) {
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_TASK_DLOG
(
"push to %s list"
,
"execTasks"
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -296,8 +294,6 @@ int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_TASK_DLOG
(
"push to %s list"
,
"succTasks"
);
*
moved
=
true
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -313,8 +309,6 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_TASK_DLOG
(
"push to %s list"
,
"failTasks"
);
*
moved
=
true
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -389,7 +383,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
SCH_ERR_RET
(
schMoveTaskToSuccList
(
job
,
task
,
&
moved
));
if
(
!
moved
)
{
SCH_TASK_E
LOG
(
"
task may already moved, status:%d"
,
task
->
status
);
SCH_TASK_E
RR_LOG
(
"
task may already moved, status:%d"
,
task
->
status
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -463,11 +457,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
SCH_ERR_RET
(
schTaskCheckAndSetRetry
(
job
,
task
,
errCode
,
&
needRetry
));
if
(
!
needRetry
)
{
SCH_TASK_ELOG
(
"task failed[%x], no more retry"
,
errCode
);
SCH_TASK_E
RR_
LOG
(
"task failed[%x], no more retry"
,
errCode
);
SCH_ERR_RET
(
schMoveTaskToFailList
(
job
,
task
,
&
moved
));
if
(
!
moved
)
{
SCH_TASK_ELOG
(
"task may already moved, status:%d"
,
task
->
status
);
SCH_TASK_E
RR_
LOG
(
"task may already moved, status:%d"
,
task
->
status
);
}
if
(
SCH_TASK_NEED_WAIT_ALL
(
task
))
{
...
...
@@ -506,6 +500,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
goto
_task_error
;
}
}
break
;
}
case
TDMT_VND_SUBMIT_RSP
:
{
...
...
@@ -582,25 +577,19 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
int32_t
code
=
0
;
SSchCallbackParam
*
pParam
=
(
SSchCallbackParam
*
)
param
;
SSchJob
**
p
job
=
taosHashGet
(
schMgmt
.
jobs
,
&
pParam
->
queryId
,
sizeof
(
pParam
->
queryId
));
if
(
NULL
==
pjob
||
NULL
==
(
*
p
job
))
{
SSchJob
**
job
=
taosHashGet
(
schMgmt
.
jobs
,
&
pParam
->
queryId
,
sizeof
(
pParam
->
queryId
));
if
(
NULL
==
job
||
NULL
==
(
*
job
))
{
qError
(
"taosHashGet queryId:%"
PRIx64
" not exist"
,
pParam
->
queryId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
SSchJob
*
job
=
*
pjob
;
SSchTask
**
ptask
=
taosHashGet
(
job
->
execTasks
,
&
pParam
->
taskId
,
sizeof
(
pParam
->
taskId
));
if
(
NULL
==
ptask
||
NULL
==
(
*
ptask
))
{
SSchTask
**
task
=
taosHashGet
((
*
job
)
->
execTasks
,
&
pParam
->
taskId
,
sizeof
(
pParam
->
taskId
));
if
(
NULL
==
task
||
NULL
==
(
*
task
))
{
qError
(
"taosHashGet taskId:%"
PRIx64
" not exist"
,
pParam
->
taskId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
SSchTask
*
task
=
*
ptask
;
SCH_TASK_DLOG
(
"Got msg:%d, rspCode:%d"
,
msgType
,
rspCode
);
schProcessRspMsg
(
job
,
task
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
);
schProcessRspMsg
(
*
job
,
*
task
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
);
_return:
tfree
(
param
);
...
...
@@ -809,7 +798,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
}
SEpSet
epSet
;
SQueryNodeAddr
*
addr
=
taosArrayGet
(
task
->
c
ondidateAddrs
,
task
->
co
ndidateIdx
);
SQueryNodeAddr
*
addr
=
taosArrayGet
(
task
->
c
andidateAddrs
,
task
->
ca
ndidateIdx
);
schConvertAddrToEpSet
(
addr
,
&
epSet
);
...
...
@@ -827,10 +816,10 @@ _return:
int32_t
schLaunchTask
(
SSchJob
*
job
,
SSchTask
*
task
)
{
SSubplan
*
plan
=
task
->
plan
;
SCH_ERR_RET
(
qSubPlanToString
(
plan
,
&
task
->
msg
,
&
task
->
msgLen
));
SCH_ERR_RET
(
schSetTaskC
o
ndidateAddrs
(
job
,
task
));
SCH_ERR_RET
(
schSetTaskC
a
ndidateAddrs
(
job
,
task
));
if
(
NULL
==
task
->
c
ondidateAddrs
||
taosArrayGetSize
(
task
->
co
ndidateAddrs
)
<=
0
)
{
SCH_TASK_ELOG
(
"no valid condidate node for task:%"
PRIx64
,
task
->
taskId
);
if
(
NULL
==
task
->
c
andidateAddrs
||
taosArrayGetSize
(
task
->
ca
ndidateAddrs
)
<=
0
)
{
SCH_TASK_E
RR_
LOG
(
"no valid condidate node for task:%"
PRIx64
,
task
->
taskId
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录