未验证 提交 a01d49b2 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #17522 from taosdata/feat/TD-19386

feat: add retrieve DB and table route info APIs
......@@ -126,6 +126,22 @@ typedef struct setConfRet {
char retMsg[RET_MSG_LENGTH];
} setConfRet;
typedef struct TAOS_VGROUP_HASH_INFO {
int32_t vgId;
uint32_t hashBegin;
uint32_t hashEnd;
} TAOS_VGROUP_HASH_INFO ;
typedef struct TAOS_DB_ROUTE_INFO {
int32_t routeVersion;
int16_t hashPrefix;
int16_t hashSuffix;
int8_t hashMethod;
int32_t vgNum;
TAOS_VGROUP_HASH_INFO *vgHash;
} TAOS_DB_ROUTE_INFO ;
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT setConfRet taos_set_config(const char *config);
......@@ -196,6 +212,9 @@ DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, vo
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
DLL_EXPORT int taos_get_db_route_info(TAOS* taos, const char* db, TAOS_DB_ROUTE_INFO* dbInfo);
DLL_EXPORT int taos_get_table_vgId(TAOS* taos, const char* db, const char* table, int* vgId);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision);
......
......@@ -163,7 +163,9 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t catalogGetDBVgInfo(SCatalog* pCatalog, SRequestConnInfo* pConn, const char* pDBName, SArray** pVgroupList);
int32_t catalogGetDBVgList(SCatalog* pCatalog, SRequestConnInfo* pConn, const char* pDBName, SArray** pVgroupList);
int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, TAOS_DB_ROUTE_INFO* pInfo);
int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgInfo* dbInfo);
......
......@@ -643,7 +643,7 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray*
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pInst->mgmtEp)};
code = catalogGetDBVgInfo(pCtg, &conn, dbFName, &pVgList);
code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
if (code) {
goto _return;
}
......
......@@ -989,6 +989,106 @@ const void *taos_get_raw_block(TAOS_RES *res) {
return pRequest->body.resInfo.pData;
}
int taos_get_db_route_info(TAOS* taos, const char* db, TAOS_DB_ROUTE_INFO* dbInfo) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return terrno;
}
if (NULL == db || NULL == dbInfo) {
tscError("invalid input param, db:%p, dbInfo:%p", db, dbInfo);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
return terrno;
}
int64_t connId = *(int64_t *)taos;
SRequestObj *pRequest = NULL;
char *sql = "taos_get_db_route_info";
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return terrno;
}
STscObj *pTscObj = pRequest->pTscObj;
SCatalog *pCtg = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
if (code != TSDB_CODE_SUCCESS) {
goto _return;
}
SRequestConnInfo conn = {
.pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
char dbFName[TSDB_DB_FNAME_LEN] = {0};
snprintf(dbFName, sizeof(dbFName), "%d.%s", pTscObj->acctId, db);
code = catalogGetDBVgInfo(pCtg, &conn, dbFName, dbInfo);
if (code) {
goto _return;
}
_return:
terrno = code;
destroyRequest(pRequest);
return code;
}
int taos_get_table_vgId(TAOS* taos, const char* db, const char* table, int* vgId) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return terrno;
}
if (NULL == db || NULL == table || NULL == vgId) {
tscError("invalid input param, db:%p, table:%p, vgId:%p", db, table, vgId);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
return terrno;
}
int64_t connId = *(int64_t *)taos;
SRequestObj *pRequest = NULL;
char *sql = "taos_get_table_vgId";
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
return terrno;
}
STscObj *pTscObj = pRequest->pTscObj;
SCatalog *pCtg = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
if (code != TSDB_CODE_SUCCESS) {
goto _return;
}
SRequestConnInfo conn = {
.pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
SName tableName;
toName(pTscObj->acctId, db, table, &tableName);
SVgroupInfo vgInfo;
code = catalogGetTableHashVgroup(pCtg, &conn, &tableName, &vgInfo);
if (code) {
goto _return;
}
*vgId = vgInfo.vgId;
_return:
terrno = code;
destroyRequest(pRequest);
return code;
}
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
......
......@@ -739,7 +739,7 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SArray** vgroupList) {
int32_t catalogGetDBVgList(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SArray** vgroupList) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == dbFName || NULL == pConn || NULL == vgroupList) {
......@@ -778,6 +778,64 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, TAOS_DB_ROUTE_INFO* pInfo) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == dbFName || NULL == pConn || NULL == pInfo) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
SCtgDBCache* dbCache = NULL;
int32_t code = 0;
SDBVgInfo* dbInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &dbInfo, NULL));
if (dbCache) {
dbInfo = dbCache->vgCache.vgInfo;
}
pInfo->routeVersion = dbInfo->vgVersion;
pInfo->hashPrefix = dbInfo->hashPrefix;
pInfo->hashSuffix = dbInfo->hashSuffix;
pInfo->hashMethod = dbInfo->hashMethod;
pInfo->vgNum = taosHashGetSize(dbInfo->vgHash);
if (pInfo->vgNum <= 0) {
ctgError("invalid vgNum %d in db %s's vgHash", pInfo->vgNum, dbFName);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
pInfo->vgHash = taosMemoryCalloc(pInfo->vgNum, sizeof(TAOS_VGROUP_HASH_INFO));
if (NULL == pInfo->vgHash) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
SVgroupInfo* vgInfo = NULL;
int32_t i = 0;
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
while (pIter) {
vgInfo = pIter;
pInfo->vgHash[i].vgId = vgInfo->vgId;
pInfo->vgHash[i].hashBegin = vgInfo->hashBegin;
pInfo->vgHash[i].hashEnd = vgInfo->hashEnd;
pIter = taosHashIterate(dbInfo->vgHash, pIter);
vgInfo = NULL;
++i;
}
_return:
if (dbCache) {
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
} else if (dbInfo) {
taosHashCleanup(dbInfo->vgHash);
taosMemoryFreeClear(dbInfo);
}
CTG_API_LEAVE(code);
}
int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo* dbInfo) {
CTG_API_ENTER();
......
......@@ -778,7 +778,7 @@ void *ctgTestGetDbVgroupThread(void *param) {
int32_t n = 0;
while (!ctgTestStop) {
code = catalogGetDBVgInfo(pCtg, mockPointer, ctgTestDbname, &vgList);
code = catalogGetDBVgList(pCtg, mockPointer, ctgTestDbname, &vgList);
if (code) {
assert(0);
}
......@@ -2063,7 +2063,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestTablename);
code = catalogGetDBVgInfo(pCtg, mockPointer, ctgTestDbname, &vgList);
code = catalogGetDBVgList(pCtg, mockPointer, ctgTestDbname, &vgList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum);
......
......@@ -417,11 +417,11 @@ static int32_t getDBVgInfoImpl(STranslateContext* pCxt, const SName* pName, SArr
.requestId = pParCxt->requestId,
.requestObjRefId = pParCxt->requestRid,
.mgmtEps = pParCxt->mgmtEpSet};
code = catalogGetDBVgInfo(pParCxt->pCatalog, &conn, fullDbName, pVgInfo);
code = catalogGetDBVgList(pParCxt->pCatalog, &conn, fullDbName, pVgInfo);
}
}
if (TSDB_CODE_SUCCESS != code) {
parserError("0x%" PRIx64 " catalogGetDBVgInfo error, code:%s, dbFName:%s", pCxt->pParseCxt->requestId,
parserError("0x%" PRIx64 " catalogGetDBVgList error, code:%s, dbFName:%s", pCxt->pParseCxt->requestId,
tstrerror(code), fullDbName);
}
return code;
......
......@@ -243,8 +243,8 @@ int32_t __catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* ve
return 0;
}
int32_t __catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SArray** pVgList) {
return g_mockCatalogService->catalogGetDBVgInfo(dbFName, pVgList);
int32_t __catalogGetDBVgList(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SArray** pVgList) {
return g_mockCatalogService->catalogGetDBVgList(dbFName, pVgList);
}
int32_t __catalogGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* pDbCfg) {
......@@ -293,7 +293,7 @@ void initMetaDataEnv() {
stub.set(catalogGetTableHashVgroup, __catalogGetTableHashVgroup);
stub.set(catalogGetTableDistVgInfo, __catalogGetTableDistVgInfo);
stub.set(catalogGetDBVgVersion, __catalogGetDBVgVersion);
stub.set(catalogGetDBVgInfo, __catalogGetDBVgInfo);
stub.set(catalogGetDBVgList, __catalogGetDBVgList);
stub.set(catalogGetDBCfg, __catalogGetDBCfg);
stub.set(catalogChkAuth, __catalogChkAuth);
stub.set(catalogGetUdfInfo, __catalogGetUdfInfo);
......
......@@ -132,7 +132,7 @@ class MockCatalogServiceImpl {
return copyTableVgroup(db, tNameGetTableName(pTableName), vgList);
}
int32_t catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const {
int32_t catalogGetDBVgList(const char* pDbFName, SArray** pVgList) const {
std::string dbFName(pDbFName);
DbMetaCache::const_iterator it = meta_.find(dbFName.substr(std::string(pDbFName).find_last_of('.') + 1));
if (meta_.end() == it) {
......@@ -663,8 +663,8 @@ int32_t MockCatalogService::catalogGetTableDistVgInfo(const SName* pTableName, S
return impl_->catalogGetTableDistVgInfo(pTableName, pVgList);
}
int32_t MockCatalogService::catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const {
return impl_->catalogGetDBVgInfo(pDbFName, pVgList);
int32_t MockCatalogService::catalogGetDBVgList(const char* pDbFName, SArray** pVgList) const {
return impl_->catalogGetDBVgList(pDbFName, pVgList);
}
int32_t MockCatalogService::catalogGetDBCfg(const char* pDbFName, SDbCfgInfo* pDbCfg) const {
......
......@@ -70,7 +70,7 @@ class MockCatalogService {
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const;
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const;
int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const;
int32_t catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const;
int32_t catalogGetDBVgList(const char* pDbFName, SArray** pVgList) const;
int32_t catalogGetDBCfg(const char* pDbFName, SDbCfgInfo* pDbCfg) const;
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const;
int32_t catalogGetTableIndex(const SName* pTableName, SArray** pIndexes) const;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS asynchronous API example
// this example opens multiple tables, insert/retrieve multiple tables
// it is used by TAOS internally for one performance testing
// to compiple: gcc -o asyncdemo asyncdemo.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include "taos.h"
int rtTables = 20;
char hostName[128];
static void rtExecSQL(TAOS *taos, char *command) {
int i;
int32_t code = -1;
TAOS_RES *pSql = taos_query(taos, command);
code = taos_errno(pSql);
if (code != 0) {
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
taos_cleanup();
exit(EXIT_FAILURE);
}
taos_free_result(pSql);
}
static void rtFetchVgId(TAOS *taos, char *sql, int *vgId) {
int i;
int32_t code = -1;
TAOS_RES *pSql = taos_query(taos, sql);
code = taos_errno(pSql);
if (code != 0) {
fprintf(stderr, "Failed to run %s, reason: %s\n", sql, taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
taos_cleanup();
exit(EXIT_FAILURE);
}
TAOS_ROW row = taos_fetch_row(pSql);
*vgId = *(int*)row[0];
taos_free_result(pSql);
}
void rtError(char* prefix, const char* errMsg) {
fprintf(stderr, "%s error: %s\n", prefix, errMsg);
}
void rtExit(char* prefix, const char* errMsg) {
rtError(prefix, errMsg);
exit(1);
}
int rtPrepare(TAOS ** p, int prefix, int suffix) {
char sql[1024] = {0};
int32_t code = 0;
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) rtExit("taos_connect", taos_errstr(NULL));
strcpy(sql, "drop database if exists db1");
rtExecSQL(taos, sql);
sprintf(sql, "create database db1 vgroups 10 table_prefix %d table_suffix %d", prefix, suffix);
rtExecSQL(taos, sql);
strcpy(sql, "use db1");
rtExecSQL(taos, sql);
for (int32_t i = 0; i < rtTables; ++i) {
sprintf(sql, "create table tb%d (ts timestamp, f1 int)", i);
rtExecSQL(taos, sql);
}
*p = taos;
return 0;
}
int rtGetDbRouteInfo(TAOS * taos) {
TAOS_DB_ROUTE_INFO dbInfo;
int code = taos_get_db_route_info(taos, "db1", &dbInfo);
if (code) {
rtExit("taos_get_db_route_info", taos_errstr(NULL));
}
printf("db db1 routeVersion:%d hashPrefix:%d hashSuffix:%d hashMethod:%d vgNum %d\n",
dbInfo.routeVersion, dbInfo.hashPrefix, dbInfo.hashSuffix, dbInfo.hashMethod, dbInfo.vgNum);
for (int32_t i = 0; i < dbInfo.vgNum; ++i) {
printf("%dth vg, id:%d hashBegin:%u hashEnd:%u\n",
i, dbInfo.vgHash[i].vgId, dbInfo.vgHash[i].hashBegin, dbInfo.vgHash[i].hashEnd);
}
return 0;
}
int rtGetTableRouteInfo(TAOS * taos) {
char table[64] = {0};
int vgId1 = 0;
int vgId2 = 0;
char sql[1024] = {0};
for (int32_t i = 0; i < rtTables; ++i) {
sprintf(table, "tb%d", i);
int code = taos_get_table_vgId(taos, "db1", table, &vgId1);
if (code) {
rtExit("taos_get_table_vgId", taos_errstr(NULL));
}
sprintf(sql, "select vgroup_id from information_schema.ins_tables where table_name=\"tb%d\"", i);
rtFetchVgId(taos, sql, &vgId2);
if (vgId1 != vgId2) {
fprintf(stderr, "!!!! table tb%d vgId mis-match, vgId(api):%d, vgId(sys):%d\n", i, vgId1, vgId2);
exit(1);
} else {
printf("table tb%d vgId %d\n", i, vgId1);
}
}
return 0;
}
void rtClose(TAOS * taos) {
taos_close(taos);
}
int rtRunCase1(void) {
TAOS *taos = NULL;
rtPrepare(&taos, 0, 0);
rtGetDbRouteInfo(taos);
rtGetTableRouteInfo(taos);
rtClose(taos);
return 0;
}
int rtRunCase2(void) {
TAOS *taos = NULL;
rtPrepare(&taos, 2, 0);
rtGetTableRouteInfo(taos);
rtGetDbRouteInfo(taos);
rtClose(taos);
return 0;
}
int main(int argc, char *argv[]) {
if (argc != 2) {
printf("usage: %s server-ip\n", argv[0]);
exit(0);
}
srand((unsigned int)time(NULL));
strcpy(hostName, argv[1]);
rtRunCase1();
rtRunCase2();
int32_t l = 5;
while (l) {
printf("%d\n", l--);
sleep(1);
}
return 0;
}
......@@ -13,6 +13,7 @@ all: $(TARGET)
exe:
gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS)
gcc $(CFLAGS) ./stopquery.c -o $(ROOT)stopquery $(LFLAGS)
gcc $(CFLAGS) ./dbTableRoute.c -o $(ROOT)dbTableRoute $(LFLAGS)
clean:
rm $(ROOT)batchprepare
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册