提交 3d5f9a24 编写于 作者: D dapan

catalog init

上级 d962a271
...@@ -31,10 +31,10 @@ extern "C" { ...@@ -31,10 +31,10 @@ extern "C" {
struct SCatalog; struct SCatalog;
typedef struct SCatalogReq { typedef struct SCatalogReq {
char clusterId[TSDB_CLUSTER_ID_LEN]; char clusterId[TSDB_CLUSTER_ID_LEN]; //????
SArray *pTableName; // table full name SArray *pTableName; // table full name
SArray *pUdf; // udf name SArray *pUdf; // udf name
bool qNodeEpset; // valid qnode bool qNodeRequired; // valid qnode
} SCatalogReq; } SCatalogReq;
typedef struct SCatalogRsp { typedef struct SCatalogRsp {
...@@ -93,6 +93,9 @@ int32_t catalogInit(SCatalog *cfg); ...@@ -93,6 +93,9 @@ int32_t catalogInit(SCatalog *cfg);
*/ */
struct SCatalog* catalogGetHandle(const char *clusterId); struct SCatalog* catalogGetHandle(const char *clusterId);
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta);
/** /**
* Get the required meta data from mnode. * Get the required meta data from mnode.
* Note that this is a synchronized API and is also thread-safety. * Note that this is a synchronized API and is also thread-safety.
...@@ -102,11 +105,14 @@ struct SCatalog* catalogGetHandle(const char *clusterId); ...@@ -102,11 +105,14 @@ struct SCatalog* catalogGetHandle(const char *clusterId);
* @param pMetaData * @param pMetaData
* @return * @return
*/ */
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pCatalogReq, SCatalogRsp* pCatalogData); int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SCatalogRsp* pRsp);
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta); int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta);
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, SCatalogRsp* pCatalogData); int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, STableMeta* pNewTableMeta);
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet);
/** /**
......
...@@ -490,6 +490,13 @@ int32_t* taosGetErrno(); ...@@ -490,6 +490,13 @@ int32_t* taosGetErrno();
// monitor // monitor
#define TSDB_CODE_MON_CONNECTION_INVALID TAOS_DEF_ERROR_CODE(0, 0x2300) //"monitor invalid monitor db connection") #define TSDB_CODE_MON_CONNECTION_INVALID TAOS_DEF_ERROR_CODE(0, 0x2300) //"monitor invalid monitor db connection")
// catalog
#define TSDB_CODE_CTG_INTERNAL_EROR TAOS_DEF_ERROR_CODE(0, 0x2400) //catalog interval error
#define TSDB_CODE_CTG_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2401) //invalid catalog input parameters
#define TSDB_CODE_CTG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x2402) //catalog is not ready
#define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error
#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) //catalog system error
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -16,3 +16,92 @@ ...@@ -16,3 +16,92 @@
#define TAOS_MESSAGE_C #define TAOS_MESSAGE_C
#include "taosmsg.h" #include "taosmsg.h"
int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize) = {0};
int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0};
void msgInit() {
tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = buildTableMetaReqMsg;
tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = ;
/*
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
tscBuildMsg[TSDB_SQL_CREATE_FUNCTION] = tscBuildCreateFuncMsg;
tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg;
tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg;
tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
tscBuildMsg[TSDB_SQL_DROP_FUNCTION] = tscBuildDropFuncMsg;
tscBuildMsg[TSDB_SQL_SYNC_DB_REPLICA] = tscBuildSyncDbReplicaMsg;
tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
tscBuildMsg[TSDB_SQL_COMPACT_VNODE] = tscBuildCompactMsg;
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg;
tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiTableMetaRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function.
tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
tscProcessMsgRsp[TSDB_SQL_COMPACT_VNODE] = tscProcessCompactRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
*/
}
...@@ -22,17 +22,33 @@ extern "C" { ...@@ -22,17 +22,33 @@ extern "C" {
#include "catalog.h" #include "catalog.h"
#define CTG_DEFAULT_CLUSTER_NUMBER 3
typedef struct SCatalog { typedef struct SCatalog {
} SCatalog; } SCatalog;
typedef struct SCatalogMgmt { typedef struct SCatalogMgmt {
void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata
SHashObj *pMeta; // items cached for each cluster, the hash key is the cluster-id, returned by mgmt node SHashObj *pCluster; // items cached for each cluster, the hash key is the cluster-id got from mgmt node
} SCatalogMgmt; } SCatalogMgmt;
#define ctgFatal(...) tscFatal(__VA_ARGS__)
#define ctgError(...) tscError(__VA_ARGS__)
#define ctgWarn(...) tscWarn(__VA_ARGS__)
#define ctgInfo(...) tscInfo(__VA_ARGS__)
#define ctgDebug(...) tscDebug(__VA_ARGS__)
#define ctgTrace(...) tscTrace(__VA_ARGS__)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0)
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); return _code; } } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_CATALOG_INT_H_*/ #endif /*_TD_CATALOG_INT_H_*/
\ No newline at end of file
...@@ -19,14 +19,71 @@ SCatalogMgmt ctgMgmt = {0}; ...@@ -19,14 +19,71 @@ SCatalogMgmt ctgMgmt = {0};
int32_t catalogInit(SCatalog *cfg) { int32_t catalogInit(SCatalog *cfg) {
ctgMgmt = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == ctgMgmt.pCluster) {
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_EROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER);
}
ctgGetVnodeInfo();
return TSDB_CODE_SUCCESS;
} }
struct SCatalog* catalogGetHandle(const char *clusterId) { struct SCatalog* catalogGetHandle(const char *clusterId) {
return (struct SCatalog*) 0x1; if (NULL == clusterId) {
return NULL;
}
if (NULL == ctgMgmt.pCluster) {
ctgError("cluster cache are not ready");
return NULL;
}
size_t clen = strlen(clusterId);
SCatalog *clusterCtg = (SCatalog *)taosHashGet(ctgMgmt.pCluster, clusterId, clen);
if (clusterCtg) {
return clusterCtg;
}
clusterCtg = calloc(1, sizeof(*clusterCtg));
if (NULL == clusterCtg) {
ctgError("calloc %d failed", sizeof(*clusterCtg));
return NULL;
}
if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) {
ctgError("put cluster %s cache to hash failed", clusterId);
tfree(clusterCtg);
return NULL;
}
return clusterCtg;
}
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta) {
if (NULL == pCatalog || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
} }
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pMetaReq, SCatalogRsp* pMetaData) {
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SCatalogRsp* pRsp) {
if (NULL == pCatalog || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
return 0; return 0;
} }
void catalogDestroy(void) {
if (ctgMgmt.pCluster) {
taosHashCleanup(ctgMgmt.pCluster); //TBD
ctgMgmt.pCluster = NULL;
}
}
...@@ -496,6 +496,15 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_FILE_ALREADY_EXISTS, "tfs file already exis ...@@ -496,6 +496,15 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_FILE_ALREADY_EXISTS, "tfs file already exis
TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, "tfs invalid level") TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, "tfs invalid level")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk") TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk")
// catalog
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_EROR, "catalog interval error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error")
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
}; };
#endif #endif
...@@ -544,4 +553,4 @@ const char* tstrerror(int32_t err) { ...@@ -544,4 +553,4 @@ const char* tstrerror(int32_t err) {
return ""; return "";
} }
const char* terrstr() { return tstrerror(terrno); } const char* terrstr() { return tstrerror(terrno); }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册