From 3d5f9a244dafd4e1eca20fa1da5a63b14196c8bb Mon Sep 17 00:00:00 2001 From: dapan Date: Mon, 13 Dec 2021 08:10:06 +0800 Subject: [PATCH] catalog init --- include/libs/catalog/catalog.h | 14 +++-- include/util/taoserror.h | 7 +++ source/common/src/tmessage.c | 89 ++++++++++++++++++++++++++++ source/libs/catalog/inc/catalogInt.h | 20 ++++++- source/libs/catalog/src/catalog.c | 63 +++++++++++++++++++- source/util/src/terror.c | 11 +++- 6 files changed, 194 insertions(+), 10 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index b04b4f5c8d..8aacede5fe 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -31,10 +31,10 @@ extern "C" { struct SCatalog; typedef struct SCatalogReq { - char clusterId[TSDB_CLUSTER_ID_LEN]; + char clusterId[TSDB_CLUSTER_ID_LEN]; //???? SArray *pTableName; // table full name SArray *pUdf; // udf name - bool qNodeEpset; // valid qnode + bool qNodeRequired; // valid qnode } SCatalogReq; typedef struct SCatalogRsp { @@ -93,6 +93,9 @@ int32_t catalogInit(SCatalog *cfg); */ struct SCatalog* catalogGetHandle(const char *clusterId); +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta); + + /** * Get the required meta data from mnode. * Note that this is a synchronized API and is also thread-safety. @@ -102,11 +105,14 @@ struct SCatalog* catalogGetHandle(const char *clusterId); * @param pMetaData * @return */ -int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pCatalogReq, SCatalogRsp* pCatalogData); +int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SCatalogRsp* pRsp); int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta); -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, SCatalogRsp* pCatalogData); +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, STableMeta* pNewTableMeta); + +int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet); + /** diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 36301466f8..065f1ee0ab 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -490,6 +490,13 @@ int32_t* taosGetErrno(); // monitor #define TSDB_CODE_MON_CONNECTION_INVALID TAOS_DEF_ERROR_CODE(0, 0x2300) //"monitor invalid monitor db connection") +// catalog +#define TSDB_CODE_CTG_INTERNAL_EROR TAOS_DEF_ERROR_CODE(0, 0x2400) //catalog interval error +#define TSDB_CODE_CTG_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2401) //invalid catalog input parameters +#define TSDB_CODE_CTG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x2402) //catalog is not ready +#define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error +#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) //catalog system error + #ifdef __cplusplus } #endif diff --git a/source/common/src/tmessage.c b/source/common/src/tmessage.c index 0b6dbfdb51..8609e8f09a 100644 --- a/source/common/src/tmessage.c +++ b/source/common/src/tmessage.c @@ -16,3 +16,92 @@ #define TAOS_MESSAGE_C #include "taosmsg.h" + +int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize) = {0}; + +int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0}; + + +void msgInit() { + tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = buildTableMetaReqMsg; + + tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = ; + +/* + tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; + tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg; + tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg; + + tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg; + tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg; + tscBuildMsg[TSDB_SQL_CREATE_FUNCTION] = tscBuildCreateFuncMsg; + + tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg; + tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg; + + tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg; + tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg; + tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg; + tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg; + tscBuildMsg[TSDB_SQL_DROP_FUNCTION] = tscBuildDropFuncMsg; + tscBuildMsg[TSDB_SQL_SYNC_DB_REPLICA] = tscBuildSyncDbReplicaMsg; + tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg; + tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg; + tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg; + tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg; + tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg; + tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg; + tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg; + tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg; + tscBuildMsg[TSDB_SQL_COMPACT_VNODE] = tscBuildCompactMsg; + + tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg; + tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg; + tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg; + tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg; + + tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg; + tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg; + tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg; + tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg; + tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg; + tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg; + + tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp; + tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode; + + tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp; + tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp; + tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp; + tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp; + tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp; + tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp; + tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiTableMetaRsp; + tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp; + + tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp; + tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function. + tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp; + + tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessLocalRetrieveRsp; + tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp; + tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp; + tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessLocalRetrieveRsp; + tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessLocalRetrieveRsp; + + tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp; + + tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp; + + tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp; + tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp; + tscProcessMsgRsp[TSDB_SQL_COMPACT_VNODE] = tscProcessCompactRsp; + + tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp; + tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp; + tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp; +*/ +} + + + diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 60cc1771e2..8703f1f0ce 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -22,17 +22,33 @@ extern "C" { #include "catalog.h" +#define CTG_DEFAULT_CLUSTER_NUMBER 3 + typedef struct SCatalog { } SCatalog; typedef struct SCatalogMgmt { void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata - SHashObj *pMeta; // items cached for each cluster, the hash key is the cluster-id, returned by mgmt node + SHashObj *pCluster; // items cached for each cluster, the hash key is the cluster-id got from mgmt node } SCatalogMgmt; + + +#define ctgFatal(...) tscFatal(__VA_ARGS__) +#define ctgError(...) tscError(__VA_ARGS__) +#define ctgWarn(...) tscWarn(__VA_ARGS__) +#define ctgInfo(...) tscInfo(__VA_ARGS__) +#define ctgDebug(...) tscDebug(__VA_ARGS__) +#define ctgTrace(...) tscTrace(__VA_ARGS__) + +#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0) +#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); return _code; } } while (0) +#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0) + + #ifdef __cplusplus } #endif -#endif /*_TD_CATALOG_INT_H_*/ \ No newline at end of file +#endif /*_TD_CATALOG_INT_H_*/ diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index cd6e357f43..a305df05d0 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -19,14 +19,71 @@ SCatalogMgmt ctgMgmt = {0}; int32_t catalogInit(SCatalog *cfg) { - ctgMgmt = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == ctgMgmt.pCluster) { + CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_EROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER); + } + + ctgGetVnodeInfo(); + + return TSDB_CODE_SUCCESS; } struct SCatalog* catalogGetHandle(const char *clusterId) { - return (struct SCatalog*) 0x1; + if (NULL == clusterId) { + return NULL; + } + + if (NULL == ctgMgmt.pCluster) { + ctgError("cluster cache are not ready"); + return NULL; + } + + size_t clen = strlen(clusterId); + SCatalog *clusterCtg = (SCatalog *)taosHashGet(ctgMgmt.pCluster, clusterId, clen); + + if (clusterCtg) { + return clusterCtg; + } + + clusterCtg = calloc(1, sizeof(*clusterCtg)); + if (NULL == clusterCtg) { + ctgError("calloc %d failed", sizeof(*clusterCtg)); + return NULL; + } + + if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) { + ctgError("put cluster %s cache to hash failed", clusterId); + tfree(clusterCtg); + return NULL; + } + + return clusterCtg; +} + +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta) { + if (NULL == pCatalog || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + } -int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pMetaReq, SCatalogRsp* pMetaData) { + +int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SCatalogRsp* pRsp) { + if (NULL == pCatalog || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + return 0; } + +void catalogDestroy(void) { + if (ctgMgmt.pCluster) { + taosHashCleanup(ctgMgmt.pCluster); //TBD + ctgMgmt.pCluster = NULL; + } +} + + + diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 5110c8ba22..c4ca44f1d2 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -496,6 +496,15 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_FILE_ALREADY_EXISTS, "tfs file already exis TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, "tfs invalid level") TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk") +// catalog +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_EROR, "catalog interval error") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error") + + + #ifdef TAOS_ERROR_C }; #endif @@ -544,4 +553,4 @@ const char* tstrerror(int32_t err) { return ""; } -const char* terrstr() { return tstrerror(terrno); } \ No newline at end of file +const char* terrstr() { return tstrerror(terrno); } -- GitLab