Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a3ba58e0
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a3ba58e0
编写于
10月 19, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into fix/TD-19223-D
上级
89c0f761
09a77b3b
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
86 addition
and
18 deletion
+86
-18
docs/zh/07-develop/07-tmq.mdx
docs/zh/07-develop/07-tmq.mdx
+12
-1
include/common/tmsg.h
include/common/tmsg.h
+3
-0
source/dnode/vnode/src/meta/metaEntry.c
source/dnode/vnode/src/meta/metaEntry.c
+1
-1
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+2
-1
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+53
-9
source/libs/tdb/src/db/tdbBtree.c
source/libs/tdb/src/db/tdbBtree.c
+6
-0
source/libs/tdb/src/db/tdbDb.c
source/libs/tdb/src/db/tdbDb.c
+6
-3
source/libs/tdb/src/db/tdbPCache.c
source/libs/tdb/src/db/tdbPCache.c
+1
-1
source/libs/tdb/src/db/tdbPager.c
source/libs/tdb/src/db/tdbPager.c
+2
-2
未找到文件。
docs/zh/07-develop/07-tmq.mdx
浏览文件 @
a3ba58e0
...
...
@@ -420,7 +420,18 @@ let mut consumer = tmq.build()?;
<TabItem value="Python" label="Python">
Python 使用以下配置项创建一个 Consumer 实例。
Python 语言下引入 `taos` 库的 `TaosConsumer` 类,创建一个 Consumer 示例:
```python
from taos.tmq import TaosConsumer
# Syntax: `consumer = TaosConsumer(*topics, **args)`
#
# Example:
consumer = TaosConsumer('topic1', 'topic2', td_connect_ip = "127.0.0.1", group_id = "local")
```
其中,元组类型参数被视为 *Topics*,字典类型参数用于以下订阅配置设置:
| 参数名称 | 类型 | 参数说明 | 备注 |
| :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- |
...
...
include/common/tmsg.h
浏览文件 @
a3ba58e0
...
...
@@ -343,6 +343,8 @@ typedef struct {
}
SSchemaWrapper
;
static
FORCE_INLINE
SSchemaWrapper
*
tCloneSSchemaWrapper
(
const
SSchemaWrapper
*
pSchemaWrapper
)
{
if
(
pSchemaWrapper
->
pSchema
==
NULL
)
return
NULL
;
SSchemaWrapper
*
pSW
=
(
SSchemaWrapper
*
)
taosMemoryMalloc
(
sizeof
(
SSchemaWrapper
));
if
(
pSW
==
NULL
)
return
pSW
;
pSW
->
nCols
=
pSchemaWrapper
->
nCols
;
...
...
@@ -352,6 +354,7 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p
taosMemoryFree
(
pSW
);
return
NULL
;
}
memcpy
(
pSW
->
pSchema
,
pSchemaWrapper
->
pSchema
,
pSW
->
nCols
*
sizeof
(
SSchema
));
return
pSW
;
}
...
...
source/dnode/vnode/src/meta/metaEntry.c
浏览文件 @
a3ba58e0
...
...
@@ -21,7 +21,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if
(
tEncodeI64
(
pCoder
,
pME
->
version
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pCoder
,
pME
->
type
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pME
->
uid
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pCoder
,
pME
->
name
)
<
0
)
return
-
1
;
if
(
pME
->
name
==
NULL
||
tEncodeCStr
(
pCoder
,
pME
->
name
)
<
0
)
return
-
1
;
if
(
pME
->
type
==
TSDB_SUPER_TABLE
)
{
if
(
tEncodeI8
(
pCoder
,
pME
->
flags
)
<
0
)
return
-
1
;
// TODO: need refactor?
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
a3ba58e0
...
...
@@ -505,6 +505,7 @@ typedef struct SCtgOperation {
#define CTG_FLAG_UNKNOWN_STB 0x4
#define CTG_FLAG_SYS_DB 0x8
#define CTG_FLAG_FORCE_UPDATE 0x10
#define CTG_FLAG_ONLY_CACHE 0x20
#define CTG_FLAG_SET(_flag, _v) ((_flag) |= (_v))
...
...
@@ -783,7 +784,7 @@ void ctgFreeQNode(SCtgQNode* node);
void
ctgClearHandle
(
SCatalog
*
pCtg
);
void
ctgFreeTbCacheImpl
(
SCtgTbCache
*
pCache
);
int32_t
ctgRemoveTbMeta
(
SCatalog
*
pCtg
,
SName
*
pTableName
);
int32_t
ctgGetTbHashVgroup
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
SVgroupInfo
*
pVgroup
);
int32_t
ctgGetTbHashVgroup
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
SVgroupInfo
*
pVgroup
,
bool
*
exists
);
SName
*
ctgGetFetchName
(
SArray
*
pNames
,
SCtgFetch
*
pFetch
);
extern
SCatalogMgmt
gCtgMgmt
;
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
a3ba58e0
...
...
@@ -23,12 +23,21 @@
SCatalogMgmt
gCtgMgmt
=
{
0
};
int32_t
ctgGetDBVgInfo
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
dbFName
,
SCtgDBCache
**
dbCache
,
SDBVgInfo
**
pInfo
)
{
SDBVgInfo
**
pInfo
,
bool
*
exists
)
{
int32_t
code
=
0
;
CTG_ERR_RET
(
ctgAcquireVgInfoFromCache
(
pCtg
,
dbFName
,
dbCache
));
if
(
*
dbCache
)
{
if
(
exists
)
{
*
exists
=
true
;
}
return
TSDB_CODE_SUCCESS
;
}
if
(
exists
)
{
*
exists
=
false
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -94,7 +103,7 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx*
int32_t
code
=
0
;
if
(
!
CTG_FLAG_IS_SYS_DB
(
ctx
->
flag
))
{
CTG_ERR_RET
(
ctgGetTbHashVgroup
(
pCtg
,
pConn
,
ctx
->
pName
,
&
vgroupInfo
));
CTG_ERR_RET
(
ctgGetTbHashVgroup
(
pCtg
,
pConn
,
ctx
->
pName
,
&
vgroupInfo
,
NULL
));
}
STableMetaOutput
moutput
=
{
0
};
...
...
@@ -194,7 +203,7 @@ int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ctx
STableMetaOutput
*
output
=
NULL
;
CTG_ERR_RET
(
ctgGetTbMetaFromCache
(
pCtg
,
pConn
,
ctx
,
pTableMeta
));
if
(
*
pTableMeta
)
{
if
(
*
pTableMeta
||
(
ctx
->
flag
&
CTG_FLAG_ONLY_CACHE
)
)
{
goto
_return
;
}
...
...
@@ -415,7 +424,7 @@ int32_t ctgGetTbCfg(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName,
CTG_ERR_RET
(
ctgGetTableCfgFromMnode
(
pCtg
,
pConn
,
pTableName
,
pCfg
,
NULL
));
}
else
{
SVgroupInfo
vgroupInfo
=
{
0
};
CTG_ERR_RET
(
ctgGetTbHashVgroup
(
pCtg
,
pConn
,
pTableName
,
&
vgroupInfo
));
CTG_ERR_RET
(
ctgGetTbHashVgroup
(
pCtg
,
pConn
,
pTableName
,
&
vgroupInfo
,
NULL
));
CTG_ERR_RET
(
ctgGetTableCfgFromVnode
(
pCtg
,
pConn
,
pTableName
,
&
vgroupInfo
,
pCfg
,
NULL
));
}
...
...
@@ -441,7 +450,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTabl
tNameGetFullDbName
(
pTableName
,
db
);
SHashObj
*
vgHash
=
NULL
;
CTG_ERR_JRET
(
ctgGetDBVgInfo
(
pCtg
,
pConn
,
db
,
&
dbCache
,
&
vgInfo
));
CTG_ERR_JRET
(
ctgGetDBVgInfo
(
pCtg
,
pConn
,
db
,
&
dbCache
,
&
vgInfo
,
NULL
));
if
(
dbCache
)
{
vgHash
=
dbCache
->
vgCache
.
vgInfo
->
vgHash
;
...
...
@@ -501,7 +510,7 @@ _return:
CTG_RET
(
code
);
}
int32_t
ctgGetTbHashVgroup
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
SVgroupInfo
*
pVgroup
)
{
int32_t
ctgGetTbHashVgroup
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
SVgroupInfo
*
pVgroup
,
bool
*
exists
)
{
if
(
IS_SYS_DBNAME
(
pTableName
->
dbname
))
{
ctgError
(
"no valid vgInfo for db, dbname:%s"
,
pTableName
->
dbname
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
...
...
@@ -513,8 +522,13 @@ int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName*
tNameGetFullDbName
(
pTableName
,
db
);
SDBVgInfo
*
vgInfo
=
NULL
;
CTG_ERR_JRET
(
ctgGetDBVgInfo
(
pCtg
,
pConn
,
db
,
&
dbCache
,
&
vgInfo
));
CTG_ERR_JRET
(
ctgGetDBVgInfo
(
pCtg
,
pConn
,
db
,
&
dbCache
,
&
vgInfo
,
exists
));
if
(
exists
&&
false
==
*
exists
)
{
ctgDebug
(
"db %s vgInfo not in cache"
,
pTableName
->
dbname
);
return
TSDB_CODE_SUCCESS
;
}
CTG_ERR_JRET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
vgInfo
?
vgInfo
:
dbCache
->
vgCache
.
vgInfo
,
pTableName
,
pVgroup
));
_return:
...
...
@@ -737,7 +751,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char*
SArray
*
vgList
=
NULL
;
SHashObj
*
vgHash
=
NULL
;
SDBVgInfo
*
vgInfo
=
NULL
;
CTG_ERR_JRET
(
ctgGetDBVgInfo
(
pCtg
,
pConn
,
dbFName
,
&
dbCache
,
&
vgInfo
));
CTG_ERR_JRET
(
ctgGetDBVgInfo
(
pCtg
,
pConn
,
dbFName
,
&
dbCache
,
&
vgInfo
,
NULL
));
if
(
dbCache
)
{
vgHash
=
dbCache
->
vgCache
.
vgInfo
->
vgHash
;
}
else
{
...
...
@@ -880,6 +894,17 @@ int32_t catalogGetTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName
CTG_API_LEAVE
(
ctgGetTbMeta
(
pCtg
,
pConn
,
&
ctx
,
pTableMeta
));
}
int32_t
catalogGetCachedTableMeta
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
CTG_API_ENTER
();
SCtgTbMetaCtx
ctx
=
{
0
};
ctx
.
pName
=
(
SName
*
)
pTableName
;
ctx
.
flag
=
CTG_FLAG_UNKNOWN_STB
|
CTG_FLAG_ONLY_CACHE
;
CTG_API_LEAVE
(
ctgGetTbMeta
(
pCtg
,
pConn
,
&
ctx
,
pTableMeta
));
}
int32_t
catalogGetSTableMeta
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
CTG_API_ENTER
();
...
...
@@ -891,6 +916,18 @@ int32_t catalogGetSTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SNam
CTG_API_LEAVE
(
ctgGetTbMeta
(
pCtg
,
pConn
,
&
ctx
,
pTableMeta
));
}
int32_t
catalogGetCachedSTableMeta
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
CTG_API_ENTER
();
SCtgTbMetaCtx
ctx
=
{
0
};
ctx
.
pName
=
(
SName
*
)
pTableName
;
ctx
.
flag
=
CTG_FLAG_STB
|
CTG_FLAG_ONLY_CACHE
;
CTG_API_LEAVE
(
ctgGetTbMeta
(
pCtg
,
pConn
,
&
ctx
,
pTableMeta
));
}
int32_t
catalogUpdateTableMeta
(
SCatalog
*
pCtg
,
STableMetaRsp
*
pMsg
)
{
CTG_API_ENTER
();
...
...
@@ -1009,7 +1046,14 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const
SVgroupInfo
*
pVgroup
)
{
CTG_API_ENTER
();
CTG_API_LEAVE
(
ctgGetTbHashVgroup
(
pCtg
,
pConn
,
pTableName
,
pVgroup
));
CTG_API_LEAVE
(
ctgGetTbHashVgroup
(
pCtg
,
pConn
,
pTableName
,
pVgroup
,
NULL
));
}
int32_t
catalogGetCachedTableHashVgroup
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
SVgroupInfo
*
pVgroup
,
bool
*
exists
)
{
CTG_API_ENTER
();
CTG_API_LEAVE
(
ctgGetTbHashVgroup
(
pCtg
,
pConn
,
pTableName
,
pVgroup
,
exists
));
}
int32_t
catalogGetAllMeta
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
)
{
...
...
source/libs/tdb/src/db/tdbBtree.c
浏览文件 @
a3ba58e0
...
...
@@ -1094,6 +1094,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
// fetch next ofp, a new ofp and make it dirty
ret
=
tdbFetchOvflPage
(
&
pgno
,
&
nextOfp
,
pTxn
,
pBt
);
if
(
ret
<
0
)
{
tdbFree
(
pBuf
);
return
-
1
;
}
}
...
...
@@ -1135,6 +1136,11 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
memcpy
(
pBuf
,
((
SCell
*
)
pVal
)
+
vLen
-
nLeft
,
bytes
);
memcpy
(
pBuf
+
bytes
,
&
pgno
,
sizeof
(
pgno
));
if
(
ofp
==
NULL
)
{
tdbFree
(
pBuf
);
return
-
1
;
}
ret
=
tdbPageInsertCell
(
ofp
,
0
,
pBuf
,
bytes
+
sizeof
(
pgno
),
0
);
if
(
ret
<
0
)
{
tdbFree
(
pBuf
);
...
...
source/libs/tdb/src/db/tdbDb.c
浏览文件 @
a3ba58e0
...
...
@@ -106,7 +106,8 @@ int32_t tdbBegin(TDB *pDb, TXN *pTxn) {
for
(
pPager
=
pDb
->
pgrList
;
pPager
;
pPager
=
pPager
->
pNext
)
{
ret
=
tdbPagerBegin
(
pPager
,
pTxn
);
if
(
ret
<
0
)
{
tdbError
(
"failed to begin pager since %s. dbName:%s, txnId:%ld"
,
tstrerror
(
terrno
),
pDb
->
dbName
,
pTxn
->
txnId
);
tdbError
(
"failed to begin pager since %s. dbName:%s, txnId:%"
PRId64
,
tstrerror
(
terrno
),
pDb
->
dbName
,
pTxn
->
txnId
);
return
-
1
;
}
}
...
...
@@ -121,7 +122,8 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) {
for
(
pPager
=
pDb
->
pgrList
;
pPager
;
pPager
=
pPager
->
pNext
)
{
ret
=
tdbPagerCommit
(
pPager
,
pTxn
);
if
(
ret
<
0
)
{
tdbError
(
"failed to commit pager since %s. dbName:%s, txnId:%ld"
,
tstrerror
(
terrno
),
pDb
->
dbName
,
pTxn
->
txnId
);
tdbError
(
"failed to commit pager since %s. dbName:%s, txnId:%"
PRId64
,
tstrerror
(
terrno
),
pDb
->
dbName
,
pTxn
->
txnId
);
return
-
1
;
}
}
...
...
@@ -151,7 +153,8 @@ int32_t tdbAbort(TDB *pDb, TXN *pTxn) {
for
(
pPager
=
pDb
->
pgrList
;
pPager
;
pPager
=
pPager
->
pNext
)
{
ret
=
tdbPagerAbort
(
pPager
,
pTxn
);
if
(
ret
<
0
)
{
tdbError
(
"failed to abort pager since %s. dbName:%s, txnId:%ld"
,
tstrerror
(
terrno
),
pDb
->
dbName
,
pTxn
->
txnId
);
tdbError
(
"failed to abort pager since %s. dbName:%s, txnId:%"
PRId64
,
tstrerror
(
terrno
),
pDb
->
dbName
,
pTxn
->
txnId
);
return
-
1
;
}
}
...
...
source/libs/tdb/src/db/tdbPCache.c
浏览文件 @
a3ba58e0
...
...
@@ -105,7 +105,7 @@ static int tdbPCacheAlterImpl(SPCache *pCache, int32_t nPage) {
for
(
int32_t
iPage
=
pCache
->
nPages
;
iPage
<
nPage
;
iPage
++
)
{
if
(
tdbPageCreate
(
pCache
->
szPage
,
&
aPage
[
iPage
],
tdbDefaultMalloc
,
NULL
)
<
0
)
{
// TODO: handle error
tdbOsFree
(
pCache
->
aPage
);
tdbOsFree
(
aPage
);
return
-
1
;
}
...
...
source/libs/tdb/src/db/tdbPager.c
浏览文件 @
a3ba58e0
...
...
@@ -575,7 +575,7 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
offset
=
(
i64
)
pPage
->
pageSize
*
(
TDB_PAGE_PGNO
(
pPage
)
-
1
);
if
(
tdbOsLSeek
(
pPager
->
fd
,
offset
,
SEEK_SET
)
<
0
)
{
tdbError
(
"failed to lseek due to %s. file:%s, offset:%
ld"
,
strerror
(
errno
),
pPager
->
dbFileName
,
offset
);
tdbError
(
"failed to lseek due to %s. file:%s, offset:%
"
PRId64
,
strerror
(
errno
),
pPager
->
dbFileName
,
offset
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
...
...
@@ -630,7 +630,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
i64
offset
=
pPager
->
pageSize
*
(
pgno
-
1
);
if
(
tdbOsLSeek
(
pPager
->
fd
,
offset
,
SEEK_SET
)
<
0
)
{
tdbError
(
"failed to lseek fd due to %s. file:%s, offset:%
ld"
,
strerror
(
errno
),
pPager
->
dbFileName
,
offset
);
tdbError
(
"failed to lseek fd due to %s. file:%s, offset:%
"
PRId64
,
strerror
(
errno
),
pPager
->
dbFileName
,
offset
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tdbOsFree
(
pageBuf
);
return
-
1
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录