From 5c7332c2fcfd2989415a3674f1b046856d620423 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 13 Dec 2021 17:10:31 +0800 Subject: [PATCH] catalog update --- include/common/taosmsg.h | 7 +++ include/libs/catalog/catalog.h | 36 +++++++++++---- include/util/taoserror.h | 1 + source/common/src/tmessage.c | 67 ++++++++++++++++++++++++++-- source/libs/catalog/inc/catalogInt.h | 18 +++++++- source/libs/catalog/src/catalog.c | 24 ++++++++-- source/libs/parser/src/parserUtil.c | 17 ------- source/util/src/terror.c | 1 + 8 files changed, 138 insertions(+), 33 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 2ce6da9806..fd55a11b52 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -214,6 +214,12 @@ typedef enum _mgmt_table { extern char *taosMsg[]; +typedef struct SBuildTableMetaInput { + int32_t vgId; + STagData *tagData; + char *tableFullName; +} SBuildTableMetaInput; + #pragma pack(push, 1) // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta @@ -768,6 +774,7 @@ typedef struct { } SStableInfoMsg; typedef struct { + SMsgHead msgHead; char tableFname[TSDB_TABLE_FNAME_LEN]; int8_t createFlag; char tags[]; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 8aacede5fe..2092f53ba1 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -30,6 +30,19 @@ extern "C" { struct SCatalog; +typedef struct SVgroupInfo { + int32_t vgId; + int8_t numOfEps; + SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; +} SVgroupInfo; + +typedef struct SDBVgroupInfo { + int32_t vgroupVersion; + SArray *vgId; + int32_t hashRange; + int32_t hashNum; +} SDBVgroupInfo; + typedef struct SCatalogReq { char clusterId[TSDB_CLUSTER_ID_LEN]; //???? SArray *pTableName; // table full name @@ -38,8 +51,8 @@ typedef struct SCatalogReq { } SCatalogReq; typedef struct SCatalogRsp { - SArray *pTableMeta; // tableMeta - SArray *pVgroupInfo; // vgroupInfo list + SArray *pTableMeta; // STableMeta array + SArray *pVgroupInfo; // SVgroupInfo list SArray *pUdfList; // udf info list SEpSet *pEpSet; // qnode epset list } SCatalogRsp; @@ -78,11 +91,6 @@ typedef struct STableMeta { SSchema schema[]; } STableMeta; -typedef struct SCatalogCfg { - -} SCatalogCfg; - - int32_t catalogInit(SCatalog *cfg); /** @@ -91,9 +99,19 @@ int32_t catalogInit(SCatalog *cfg); * @param clusterId * @return */ -struct SCatalog* catalogGetHandle(const char *clusterId); +int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle); + +int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version); + +int32_t catalogUpdateVgroupList(struct SCatalog* pCatalog, int32_t version, SArray* vgroupList); + +int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); + +int32_t catalogGetDBVgroupInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); + +int32_t catalogUpdateDBVgroupInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta); +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta); /** diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 065f1ee0ab..a3a4297115 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -117,6 +117,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSC_INVALID_JSON TAOS_DEF_ERROR_CODE(0, 0x0221) //"Invalid JSON format") #define TSDB_CODE_TSC_INVALID_JSON_TYPE TAOS_DEF_ERROR_CODE(0, 0x0222) //"Invalid JSON data type") #define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223) //"Value out of range") +#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) //"Invalid tsc input") // mnode #define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) diff --git a/source/common/src/tmessage.c b/source/common/src/tmessage.c index 8609e8f09a..0e732caa26 100644 --- a/source/common/src/tmessage.c +++ b/source/common/src/tmessage.c @@ -17,14 +17,16 @@ #include "taosmsg.h" -int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize) = {0}; +int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0}; int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0}; void msgInit() { - tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = buildTableMetaReqMsg; - + tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = tscBuildTableMetaReqMsg; + + + tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = ; /* @@ -104,4 +106,63 @@ void msgInit() { } +char* msgSerializeTagData(STagData* pTagData, char* pMsg) { + int32_t n = (int32_t) strlen(pTagData->name); + *(int32_t*) pMsg = htonl(n); + pMsg += sizeof(n); + + memcpy(pMsg, pTagData->name, n); + pMsg += n; + + *(int32_t*)pMsg = htonl(pTagData->dataLen); + pMsg += sizeof(int32_t); + + memcpy(pMsg, pTagData->data, pTagData->dataLen); + pMsg += pTagData->dataLen; + + return pMsg; +} + + +int32_t tscBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == input || NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SBuildTableMetaInput* bInput = (SBuildTableMetaInput *)input; + + int32_t estimateSize = sizeof(STableInfoMsg) + (bInput->tagData ? (sizeof(*bInput->tagData) + bInput->tagData->dataLen) : 0); + if (NULL == *msg || msgSize < estimateSize) { + tfree(*msg); + *msg = calloc(1, estimateSize); + if (NULL == *msg) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + STableInfoMsg *bMsg = (STableInfoMsg *)*msg; + + bMsg->msgHead.vgId = bInput->vgId; + + strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); + bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0; + + int32_t autoCreate = (bInput->tagData && bInput->tagData->dataLen > 0); + + bMsg->createFlag = htons(autoCreate ? 1 : 0); + + char *pMsg = NULL; + + // tag data exists + if (autoCreate) { + pMsg = msgSerializeTagData(bInput->tagData, (char *)bMsg->tags); + } + + *msgLen = (int32_t)(pMsg - (char*)bMsg); + + return TSDB_CODE_SUCCESS; +} + + + diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 8703f1f0ce..82d7d9c571 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -24,8 +24,24 @@ extern "C" { #define CTG_DEFAULT_CLUSTER_NUMBER 3 -typedef struct SCatalog { +typedef struct SVgroupListCache { + int32_t vgroupNum; + int32_t vgroupVersion; + SHashObj *cache; //key:vgId, value:SVgroupInfo +} SVgroupListCache; + +typedef struct SDBVgroupCache { + SHashObj *cache; //key:dbname, value:SDBVgroupInfo +} SDBVgroupCache; +typedef struct STableMetaCache { + SHashObj *cache; //key:fulltablename, value:STableMeta +} STableMetaCache; + +typedef struct SCatalog { + SVgroupListCache vgroupCache; + SDBVgroupCache dbCache; + STableMetaCache tableCache; } SCatalog; typedef struct SCatalogMgmt { diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index a305df05d0..e8b79bae4b 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -24,8 +24,6 @@ int32_t catalogInit(SCatalog *cfg) { CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_EROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER); } - ctgGetVnodeInfo(); - return TSDB_CODE_SUCCESS; } @@ -62,11 +60,31 @@ struct SCatalog* catalogGetHandle(const char *clusterId) { return clusterCtg; } -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta) { +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta) { if (NULL == pCatalog || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { return TSDB_CODE_CTG_INVALID_INPUT; } + SBuildTableMetaInput bInput = {0}; + char *msg = NULL; + SEpSet *pVnodeEpSet = NULL; + int32_t msgLen = 0; + + int32_t code = tscBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen); + if (code) { + return code; + } + + SRpcMsg rpcMsg = { + .msgType = TSDB_MSG_TYPE_TABLE_META, + .pCont = msg, + .contLen = msgLen, + .ahandle = (void*)pSql->self, + .handle = NULL, + .code = 0 + }; + + rpcSendRequest(pRpcObj->pDnodeConn, pVnodeEpSet, &rpcMsg, &pSql->rpcRid); } diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index 3e83381a76..f154599aae 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -1433,23 +1433,6 @@ void* vgroupInfoClear(SVgroupsInfo *vgroupList) { return NULL; } -char* serializeTagData(STagData* pTagData, char* pMsg) { - int32_t n = (int32_t) strlen(pTagData->name); - *(int32_t*) pMsg = htonl(n); - pMsg += sizeof(n); - - memcpy(pMsg, pTagData->name, n); - pMsg += n; - - *(int32_t*)pMsg = htonl(pTagData->dataLen); - pMsg += sizeof(int32_t); - - memcpy(pMsg, pTagData->data, pTagData->dataLen); - pMsg += pTagData->dataLen; - - return pMsg; -} - int32_t copyTagData(STagData* dst, const STagData* src) { dst->dataLen = src->dataLen; tstrncpy(dst->name, src->name, tListLen(dst->name)); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c4ca44f1d2..a75ce747b2 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -127,6 +127,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DUP_TAG_NAMES, "duplicated tag names" TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON, "Invalid JSON format") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON_TYPE, "Invalid JSON data type") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input") // mnode TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed") -- GitLab