diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 2ce6da98069e3bf83bd99849c3205397c50fec81..fd55a11b529aa3eede25fe451366d63cc489f0d6 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -214,6 +214,12 @@ typedef enum _mgmt_table { extern char *taosMsg[]; +typedef struct SBuildTableMetaInput { + int32_t vgId; + STagData *tagData; + char *tableFullName; +} SBuildTableMetaInput; + #pragma pack(push, 1) // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta @@ -768,6 +774,7 @@ typedef struct { } SStableInfoMsg; typedef struct { + SMsgHead msgHead; char tableFname[TSDB_TABLE_FNAME_LEN]; int8_t createFlag; char tags[]; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 8aacede5febab228023c0eab451bfe8eaae1cbe3..2092f53ba1d4316a0c72416b056f5709f24eae3c 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -30,6 +30,19 @@ extern "C" { struct SCatalog; +typedef struct SVgroupInfo { + int32_t vgId; + int8_t numOfEps; + SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; +} SVgroupInfo; + +typedef struct SDBVgroupInfo { + int32_t vgroupVersion; + SArray *vgId; + int32_t hashRange; + int32_t hashNum; +} SDBVgroupInfo; + typedef struct SCatalogReq { char clusterId[TSDB_CLUSTER_ID_LEN]; //???? SArray *pTableName; // table full name @@ -38,8 +51,8 @@ typedef struct SCatalogReq { } SCatalogReq; typedef struct SCatalogRsp { - SArray *pTableMeta; // tableMeta - SArray *pVgroupInfo; // vgroupInfo list + SArray *pTableMeta; // STableMeta array + SArray *pVgroupInfo; // SVgroupInfo list SArray *pUdfList; // udf info list SEpSet *pEpSet; // qnode epset list } SCatalogRsp; @@ -78,11 +91,6 @@ typedef struct STableMeta { SSchema schema[]; } STableMeta; -typedef struct SCatalogCfg { - -} SCatalogCfg; - - int32_t catalogInit(SCatalog *cfg); /** @@ -91,9 +99,19 @@ int32_t catalogInit(SCatalog *cfg); * @param clusterId * @return */ -struct SCatalog* catalogGetHandle(const char *clusterId); +int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle); + +int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version); + +int32_t catalogUpdateVgroupList(struct SCatalog* pCatalog, int32_t version, SArray* vgroupList); + +int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); + +int32_t catalogGetDBVgroupInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); + +int32_t catalogUpdateDBVgroupInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta); +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta); /** diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 065f1ee0ab2bcc47706b0e17cab96d4182b227e0..a3a4297115faf70e217e26b7cab5a24cad2fb275 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -117,6 +117,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSC_INVALID_JSON TAOS_DEF_ERROR_CODE(0, 0x0221) //"Invalid JSON format") #define TSDB_CODE_TSC_INVALID_JSON_TYPE TAOS_DEF_ERROR_CODE(0, 0x0222) //"Invalid JSON data type") #define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223) //"Value out of range") +#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) //"Invalid tsc input") // mnode #define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) diff --git a/source/common/src/tmessage.c b/source/common/src/tmessage.c index 8609e8f09a809a98f140afe28f8de27977362f27..0e732caa260f42cc2ca683c26b39a8c49d2cdeff 100644 --- a/source/common/src/tmessage.c +++ b/source/common/src/tmessage.c @@ -17,14 +17,16 @@ #include "taosmsg.h" -int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize) = {0}; +int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0}; int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0}; void msgInit() { - tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = buildTableMetaReqMsg; - + tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = tscBuildTableMetaReqMsg; + + + tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = ; /* @@ -104,4 +106,63 @@ void msgInit() { } +char* msgSerializeTagData(STagData* pTagData, char* pMsg) { + int32_t n = (int32_t) strlen(pTagData->name); + *(int32_t*) pMsg = htonl(n); + pMsg += sizeof(n); + + memcpy(pMsg, pTagData->name, n); + pMsg += n; + + *(int32_t*)pMsg = htonl(pTagData->dataLen); + pMsg += sizeof(int32_t); + + memcpy(pMsg, pTagData->data, pTagData->dataLen); + pMsg += pTagData->dataLen; + + return pMsg; +} + + +int32_t tscBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == input || NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SBuildTableMetaInput* bInput = (SBuildTableMetaInput *)input; + + int32_t estimateSize = sizeof(STableInfoMsg) + (bInput->tagData ? (sizeof(*bInput->tagData) + bInput->tagData->dataLen) : 0); + if (NULL == *msg || msgSize < estimateSize) { + tfree(*msg); + *msg = calloc(1, estimateSize); + if (NULL == *msg) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + STableInfoMsg *bMsg = (STableInfoMsg *)*msg; + + bMsg->msgHead.vgId = bInput->vgId; + + strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); + bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0; + + int32_t autoCreate = (bInput->tagData && bInput->tagData->dataLen > 0); + + bMsg->createFlag = htons(autoCreate ? 1 : 0); + + char *pMsg = NULL; + + // tag data exists + if (autoCreate) { + pMsg = msgSerializeTagData(bInput->tagData, (char *)bMsg->tags); + } + + *msgLen = (int32_t)(pMsg - (char*)bMsg); + + return TSDB_CODE_SUCCESS; +} + + + diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 8703f1f0ceec0f815681ad93183594dded51c8aa..82d7d9c571a291f876e09aeed8651db6f95e63ef 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -24,8 +24,24 @@ extern "C" { #define CTG_DEFAULT_CLUSTER_NUMBER 3 -typedef struct SCatalog { +typedef struct SVgroupListCache { + int32_t vgroupNum; + int32_t vgroupVersion; + SHashObj *cache; //key:vgId, value:SVgroupInfo +} SVgroupListCache; + +typedef struct SDBVgroupCache { + SHashObj *cache; //key:dbname, value:SDBVgroupInfo +} SDBVgroupCache; +typedef struct STableMetaCache { + SHashObj *cache; //key:fulltablename, value:STableMeta +} STableMetaCache; + +typedef struct SCatalog { + SVgroupListCache vgroupCache; + SDBVgroupCache dbCache; + STableMetaCache tableCache; } SCatalog; typedef struct SCatalogMgmt { diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index a305df05d0c817b7e970b57fa53b582377c11d4d..e8b79bae4b64f3f40fadb744039e95be5e0e1a7d 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -24,8 +24,6 @@ int32_t catalogInit(SCatalog *cfg) { CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_EROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER); } - ctgGetVnodeInfo(); - return TSDB_CODE_SUCCESS; } @@ -62,11 +60,31 @@ struct SCatalog* catalogGetHandle(const char *clusterId) { return clusterCtg; } -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta) { +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta) { if (NULL == pCatalog || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { return TSDB_CODE_CTG_INVALID_INPUT; } + SBuildTableMetaInput bInput = {0}; + char *msg = NULL; + SEpSet *pVnodeEpSet = NULL; + int32_t msgLen = 0; + + int32_t code = tscBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen); + if (code) { + return code; + } + + SRpcMsg rpcMsg = { + .msgType = TSDB_MSG_TYPE_TABLE_META, + .pCont = msg, + .contLen = msgLen, + .ahandle = (void*)pSql->self, + .handle = NULL, + .code = 0 + }; + + rpcSendRequest(pRpcObj->pDnodeConn, pVnodeEpSet, &rpcMsg, &pSql->rpcRid); } diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index 3e83381a76e6145fa7157459f20c10460a3687e9..f154599aae9fd306438329ab074da4f18b643cba 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -1433,23 +1433,6 @@ void* vgroupInfoClear(SVgroupsInfo *vgroupList) { return NULL; } -char* serializeTagData(STagData* pTagData, char* pMsg) { - int32_t n = (int32_t) strlen(pTagData->name); - *(int32_t*) pMsg = htonl(n); - pMsg += sizeof(n); - - memcpy(pMsg, pTagData->name, n); - pMsg += n; - - *(int32_t*)pMsg = htonl(pTagData->dataLen); - pMsg += sizeof(int32_t); - - memcpy(pMsg, pTagData->data, pTagData->dataLen); - pMsg += pTagData->dataLen; - - return pMsg; -} - int32_t copyTagData(STagData* dst, const STagData* src) { dst->dataLen = src->dataLen; tstrncpy(dst->name, src->name, tListLen(dst->name)); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c4ca44f1d256f69a009286d7678d9d254405335a..a75ce747b229212f151bf2927f22c382ae95932f 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -127,6 +127,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DUP_TAG_NAMES, "duplicated tag names" TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON, "Invalid JSON format") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON_TYPE, "Invalid JSON data type") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input") // mnode TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed")