diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index a617935bbf8a6047c4614e3cf2217c9394948b45..1f5a089aaa2a051e238eedc0315c37cad643b33f 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -420,7 +420,18 @@ let mut consumer = tmq.build()?; -Python 使用以下配置项创建一个 Consumer 实例。 +Python 语言下引入 `taos` 库的 `TaosConsumer` 类,创建一个 Consumer 示例: + +```python +from taos.tmq import TaosConsumer + +# Syntax: `consumer = TaosConsumer(*topics, **args)` +# +# Example: +consumer = TaosConsumer('topic1', 'topic2', td_connect_ip = "127.0.0.1", group_id = "local") +``` + +其中,元组类型参数被视为 *Topics*,字典类型参数用于以下订阅配置设置: | 参数名称 | 类型 | 参数说明 | 备注 | | :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- | diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e25fbe6201fcd6c51bff78f3e098a3fb9d0b60b4..42bdc3af16761b1d6a06cd7fc4bdadc05b5546fa 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -343,6 +343,8 @@ typedef struct { } SSchemaWrapper; static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) { + if (pSchemaWrapper->pSchema == NULL) return NULL; + SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); if (pSW == NULL) return pSW; pSW->nCols = pSchemaWrapper->nCols; @@ -352,6 +354,7 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p taosMemoryFree(pSW); return NULL; } + memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema)); return pSW; } diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index b7bdadec03a184a8debc7c2f9a0366fa1ec80734..72f7365a1e3304c8fa5c387eea24a968ebb87c14 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -21,7 +21,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { if (tEncodeI64(pCoder, pME->version) < 0) return -1; if (tEncodeI8(pCoder, pME->type) < 0) return -1; if (tEncodeI64(pCoder, pME->uid) < 0) return -1; - if (tEncodeCStr(pCoder, pME->name) < 0) return -1; + if (pME->name == NULL || tEncodeCStr(pCoder, pME->name) < 0) return -1; if (pME->type == TSDB_SUPER_TABLE) { if (tEncodeI8(pCoder, pME->flags) < 0) return -1; // TODO: need refactor? diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index fe71987103aedbee555b67e99c95a7fb6fb2fb0e..0786321686a5ae6f5b7f7937f4c5ec425f6d6c31 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -505,6 +505,7 @@ typedef struct SCtgOperation { #define CTG_FLAG_UNKNOWN_STB 0x4 #define CTG_FLAG_SYS_DB 0x8 #define CTG_FLAG_FORCE_UPDATE 0x10 +#define CTG_FLAG_ONLY_CACHE 0x20 #define CTG_FLAG_SET(_flag, _v) ((_flag) |= (_v)) @@ -783,7 +784,7 @@ void ctgFreeQNode(SCtgQNode* node); void ctgClearHandle(SCatalog* pCtg); void ctgFreeTbCacheImpl(SCtgTbCache* pCache); int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName); -int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup); +int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists); SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch); extern SCatalogMgmt gCtgMgmt; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 218a86ed5cad6979afd483394906969df5ff0dc0..5997f63a16e0d669467a18a53eb84e9e6bcfebd9 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -23,12 +23,21 @@ SCatalogMgmt gCtgMgmt = {0}; int32_t ctgGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SCtgDBCache** dbCache, - SDBVgInfo** pInfo) { + SDBVgInfo** pInfo, bool* exists) { int32_t code = 0; CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache)); if (*dbCache) { + if (exists) { + *exists = true; + } + + return TSDB_CODE_SUCCESS; + } + + if (exists) { + *exists = false; return TSDB_CODE_SUCCESS; } @@ -94,7 +103,7 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* int32_t code = 0; if (!CTG_FLAG_IS_SYS_DB(ctx->flag)) { - CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, ctx->pName, &vgroupInfo)); + CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, ctx->pName, &vgroupInfo, NULL)); } STableMetaOutput moutput = {0}; @@ -194,7 +203,7 @@ int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ctx STableMetaOutput* output = NULL; CTG_ERR_RET(ctgGetTbMetaFromCache(pCtg, pConn, ctx, pTableMeta)); - if (*pTableMeta) { + if (*pTableMeta || (ctx->flag & CTG_FLAG_ONLY_CACHE)) { goto _return; } @@ -415,7 +424,7 @@ int32_t ctgGetTbCfg(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, CTG_ERR_RET(ctgGetTableCfgFromMnode(pCtg, pConn, pTableName, pCfg, NULL)); } else { SVgroupInfo vgroupInfo = {0}; - CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, pTableName, &vgroupInfo)); + CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, pTableName, &vgroupInfo, NULL)); CTG_ERR_RET(ctgGetTableCfgFromVnode(pCtg, pConn, pTableName, &vgroupInfo, pCfg, NULL)); } @@ -441,7 +450,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTabl tNameGetFullDbName(pTableName, db); SHashObj* vgHash = NULL; - CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo)); + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo, NULL)); if (dbCache) { vgHash = dbCache->vgCache.vgInfo->vgHash; @@ -501,7 +510,7 @@ _return: CTG_RET(code); } -int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup) { +int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists) { if (IS_SYS_DBNAME(pTableName->dbname)) { ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); @@ -513,8 +522,13 @@ int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* tNameGetFullDbName(pTableName, db); SDBVgInfo* vgInfo = NULL; - CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo)); + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo, exists)); + if (exists && false == *exists) { + ctgDebug("db %s vgInfo not in cache", pTableName->dbname); + return TSDB_CODE_SUCCESS; + } + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, pTableName, pVgroup)); _return: @@ -737,7 +751,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* SArray* vgList = NULL; SHashObj* vgHash = NULL; SDBVgInfo* vgInfo = NULL; - CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &vgInfo)); + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &vgInfo, NULL)); if (dbCache) { vgHash = dbCache->vgCache.vgInfo->vgHash; } else { @@ -880,6 +894,17 @@ int32_t catalogGetTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName CTG_API_LEAVE(ctgGetTbMeta(pCtg, pConn, &ctx, pTableMeta)); } +int32_t catalogGetCachedTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMeta** pTableMeta) { + CTG_API_ENTER(); + + SCtgTbMetaCtx ctx = {0}; + ctx.pName = (SName*)pTableName; + ctx.flag = CTG_FLAG_UNKNOWN_STB | CTG_FLAG_ONLY_CACHE; + + CTG_API_LEAVE(ctgGetTbMeta(pCtg, pConn, &ctx, pTableMeta)); +} + + int32_t catalogGetSTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMeta** pTableMeta) { CTG_API_ENTER(); @@ -891,6 +916,18 @@ int32_t catalogGetSTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SNam CTG_API_LEAVE(ctgGetTbMeta(pCtg, pConn, &ctx, pTableMeta)); } +int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, + STableMeta** pTableMeta) { + CTG_API_ENTER(); + + SCtgTbMetaCtx ctx = {0}; + ctx.pName = (SName*)pTableName; + ctx.flag = CTG_FLAG_STB | CTG_FLAG_ONLY_CACHE; + + CTG_API_LEAVE(ctgGetTbMeta(pCtg, pConn, &ctx, pTableMeta)); +} + + int32_t catalogUpdateTableMeta(SCatalog* pCtg, STableMetaRsp* pMsg) { CTG_API_ENTER(); @@ -1009,7 +1046,14 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SVgroupInfo* pVgroup) { CTG_API_ENTER(); - CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup)); + CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup, NULL)); +} + +int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, + SVgroupInfo* pVgroup, bool* exists) { + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup, exists)); } int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp) { diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 8f11fb88a2399f84af8b32e41fb9477c2d61485c..ba0b3dbcf8206a54b68bfffbb0c4a14584dc93b9 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -1094,6 +1094,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const // fetch next ofp, a new ofp and make it dirty ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt); if (ret < 0) { + tdbFree(pBuf); return -1; } } @@ -1135,6 +1136,11 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes); memcpy(pBuf + bytes, &pgno, sizeof(pgno)); + if (ofp == NULL) { + tdbFree(pBuf); + return -1; + } + ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0); if (ret < 0) { tdbFree(pBuf); diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index a2002a3ac46fd04cd8b0004f5fe4f0a22238e129..1e2b059adbf5d201140331039daf8a493c3dbe32 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -106,7 +106,8 @@ int32_t tdbBegin(TDB *pDb, TXN *pTxn) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { ret = tdbPagerBegin(pPager, pTxn); if (ret < 0) { - tdbError("failed to begin pager since %s. dbName:%s, txnId:%ld", tstrerror(terrno), pDb->dbName, pTxn->txnId); + tdbError("failed to begin pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName, + pTxn->txnId); return -1; } } @@ -121,7 +122,8 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { ret = tdbPagerCommit(pPager, pTxn); if (ret < 0) { - tdbError("failed to commit pager since %s. dbName:%s, txnId:%ld", tstrerror(terrno), pDb->dbName, pTxn->txnId); + tdbError("failed to commit pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName, + pTxn->txnId); return -1; } } @@ -151,7 +153,8 @@ int32_t tdbAbort(TDB *pDb, TXN *pTxn) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { ret = tdbPagerAbort(pPager, pTxn); if (ret < 0) { - tdbError("failed to abort pager since %s. dbName:%s, txnId:%ld", tstrerror(terrno), pDb->dbName, pTxn->txnId); + tdbError("failed to abort pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName, + pTxn->txnId); return -1; } } diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index 732a57639f2cc1a931a81058461034911a6729ee..667cd706c5e7814cce5d7a932f53fe59cc948b7c 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -105,7 +105,7 @@ static int tdbPCacheAlterImpl(SPCache *pCache, int32_t nPage) { for (int32_t iPage = pCache->nPages; iPage < nPage; iPage++) { if (tdbPageCreate(pCache->szPage, &aPage[iPage], tdbDefaultMalloc, NULL) < 0) { // TODO: handle error - tdbOsFree(pCache->aPage); + tdbOsFree(aPage); return -1; } diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 58f6cedc2268d847d53bd6f739e77506bb57b3c6..7c8ab16ec7f3814a7c0a45d27bfd2cb6f9a18a99 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -575,7 +575,7 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) { offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1); if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) { - tdbError("failed to lseek due to %s. file:%s, offset:%ld", strerror(errno), pPager->dbFileName, offset); + tdbError("failed to lseek due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -630,7 +630,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) { i64 offset = pPager->pageSize * (pgno - 1); if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) { - tdbError("failed to lseek fd due to %s. file:%s, offset:%ld", strerror(errno), pPager->dbFileName, offset); + tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset); terrno = TAOS_SYSTEM_ERROR(errno); tdbOsFree(pageBuf); return -1;