提交 5c7332c2 编写于 作者: D dapan1121

catalog update

上级 3d5f9a24
......@@ -214,6 +214,12 @@ typedef enum _mgmt_table {
extern char *taosMsg[];
typedef struct SBuildTableMetaInput {
int32_t vgId;
STagData *tagData;
char *tableFullName;
} SBuildTableMetaInput;
#pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
......@@ -768,6 +774,7 @@ typedef struct {
} SStableInfoMsg;
typedef struct {
SMsgHead msgHead;
char tableFname[TSDB_TABLE_FNAME_LEN];
int8_t createFlag;
char tags[];
......
......@@ -30,6 +30,19 @@ extern "C" {
struct SCatalog;
typedef struct SVgroupInfo {
int32_t vgId;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SVgroupInfo;
typedef struct SDBVgroupInfo {
int32_t vgroupVersion;
SArray *vgId;
int32_t hashRange;
int32_t hashNum;
} SDBVgroupInfo;
typedef struct SCatalogReq {
char clusterId[TSDB_CLUSTER_ID_LEN]; //????
SArray *pTableName; // table full name
......@@ -38,8 +51,8 @@ typedef struct SCatalogReq {
} SCatalogReq;
typedef struct SCatalogRsp {
SArray *pTableMeta; // tableMeta
SArray *pVgroupInfo; // vgroupInfo list
SArray *pTableMeta; // STableMeta array
SArray *pVgroupInfo; // SVgroupInfo list
SArray *pUdfList; // udf info list
SEpSet *pEpSet; // qnode epset list
} SCatalogRsp;
......@@ -78,11 +91,6 @@ typedef struct STableMeta {
SSchema schema[];
} STableMeta;
typedef struct SCatalogCfg {
} SCatalogCfg;
int32_t catalogInit(SCatalog *cfg);
/**
......@@ -91,9 +99,19 @@ int32_t catalogInit(SCatalog *cfg);
* @param clusterId
* @return
*/
struct SCatalog* catalogGetHandle(const char *clusterId);
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle);
int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version);
int32_t catalogUpdateVgroupList(struct SCatalog* pCatalog, int32_t version, SArray* vgroupList);
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
int32_t catalogGetDBVgroupInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
int32_t catalogUpdateDBVgroupInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta);
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta);
/**
......
......@@ -117,6 +117,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_JSON TAOS_DEF_ERROR_CODE(0, 0x0221) //"Invalid JSON format")
#define TSDB_CODE_TSC_INVALID_JSON_TYPE TAOS_DEF_ERROR_CODE(0, 0x0222) //"Invalid JSON data type")
#define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223) //"Value out of range")
#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) //"Invalid tsc input")
// mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300)
......
......@@ -17,14 +17,16 @@
#include "taosmsg.h"
int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize) = {0};
int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0};
int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0};
void msgInit() {
tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = buildTableMetaReqMsg;
tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = tscBuildTableMetaReqMsg;
tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = ;
/*
......@@ -104,4 +106,63 @@ void msgInit() {
}
char* msgSerializeTagData(STagData* pTagData, char* pMsg) {
int32_t n = (int32_t) strlen(pTagData->name);
*(int32_t*) pMsg = htonl(n);
pMsg += sizeof(n);
memcpy(pMsg, pTagData->name, n);
pMsg += n;
*(int32_t*)pMsg = htonl(pTagData->dataLen);
pMsg += sizeof(int32_t);
memcpy(pMsg, pTagData->data, pTagData->dataLen);
pMsg += pTagData->dataLen;
return pMsg;
}
int32_t tscBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
if (NULL == input || NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
SBuildTableMetaInput* bInput = (SBuildTableMetaInput *)input;
int32_t estimateSize = sizeof(STableInfoMsg) + (bInput->tagData ? (sizeof(*bInput->tagData) + bInput->tagData->dataLen) : 0);
if (NULL == *msg || msgSize < estimateSize) {
tfree(*msg);
*msg = calloc(1, estimateSize);
if (NULL == *msg) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
STableInfoMsg *bMsg = (STableInfoMsg *)*msg;
bMsg->msgHead.vgId = bInput->vgId;
strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname));
bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;
int32_t autoCreate = (bInput->tagData && bInput->tagData->dataLen > 0);
bMsg->createFlag = htons(autoCreate ? 1 : 0);
char *pMsg = NULL;
// tag data exists
if (autoCreate) {
pMsg = msgSerializeTagData(bInput->tagData, (char *)bMsg->tags);
}
*msgLen = (int32_t)(pMsg - (char*)bMsg);
return TSDB_CODE_SUCCESS;
}
......@@ -24,8 +24,24 @@ extern "C" {
#define CTG_DEFAULT_CLUSTER_NUMBER 3
typedef struct SCatalog {
typedef struct SVgroupListCache {
int32_t vgroupNum;
int32_t vgroupVersion;
SHashObj *cache; //key:vgId, value:SVgroupInfo
} SVgroupListCache;
typedef struct SDBVgroupCache {
SHashObj *cache; //key:dbname, value:SDBVgroupInfo
} SDBVgroupCache;
typedef struct STableMetaCache {
SHashObj *cache; //key:fulltablename, value:STableMeta
} STableMetaCache;
typedef struct SCatalog {
SVgroupListCache vgroupCache;
SDBVgroupCache dbCache;
STableMetaCache tableCache;
} SCatalog;
typedef struct SCatalogMgmt {
......
......@@ -24,8 +24,6 @@ int32_t catalogInit(SCatalog *cfg) {
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_EROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER);
}
ctgGetVnodeInfo();
return TSDB_CODE_SUCCESS;
}
......@@ -62,11 +60,31 @@ struct SCatalog* catalogGetHandle(const char *clusterId) {
return clusterCtg;
}
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta) {
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta) {
if (NULL == pCatalog || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
SBuildTableMetaInput bInput = {0};
char *msg = NULL;
SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0;
int32_t code = tscBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen);
if (code) {
return code;
}
SRpcMsg rpcMsg = {
.msgType = TSDB_MSG_TYPE_TABLE_META,
.pCont = msg,
.contLen = msgLen,
.ahandle = (void*)pSql->self,
.handle = NULL,
.code = 0
};
rpcSendRequest(pRpcObj->pDnodeConn, pVnodeEpSet, &rpcMsg, &pSql->rpcRid);
}
......
......@@ -1433,23 +1433,6 @@ void* vgroupInfoClear(SVgroupsInfo *vgroupList) {
return NULL;
}
char* serializeTagData(STagData* pTagData, char* pMsg) {
int32_t n = (int32_t) strlen(pTagData->name);
*(int32_t*) pMsg = htonl(n);
pMsg += sizeof(n);
memcpy(pMsg, pTagData->name, n);
pMsg += n;
*(int32_t*)pMsg = htonl(pTagData->dataLen);
pMsg += sizeof(int32_t);
memcpy(pMsg, pTagData->data, pTagData->dataLen);
pMsg += pTagData->dataLen;
return pMsg;
}
int32_t copyTagData(STagData* dst, const STagData* src) {
dst->dataLen = src->dataLen;
tstrncpy(dst->name, src->name, tListLen(dst->name));
......
......@@ -127,6 +127,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DUP_TAG_NAMES, "duplicated tag names"
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON, "Invalid JSON format")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON_TYPE, "Invalid JSON data type")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input")
// mnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册