未验证 提交 60be4f73 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9389 from taosdata/feature/vnode

more
......@@ -19,6 +19,7 @@
#include "mallocator.h"
#include "os.h"
#include "trow.h"
#include "tmsg.h"
#ifdef __cplusplus
extern "C" {
......@@ -36,39 +37,7 @@ typedef struct SMetaCfg {
uint64_t lruSize;
} SMetaCfg;
typedef struct STbCfg {
/// name of the table
char *name;
/// time to live of the table
uint32_t ttl;
/// keep time of this table
uint32_t keep;
/// type of table
uint8_t type;
union {
/// super table configurations
struct {
/// super table UID
tb_uid_t suid;
/// row schema
STSchema *pSchema;
/// tag schema
STSchema *pTagSchema;
} stbCfg;
/// normal table configuration
struct {
/// row schema
STSchema *pSchema;
} ntbCfg;
/// child table configuration
struct {
/// super table UID
tb_uid_t suid;
SKVRow pTag;
} ctbCfg;
};
} STbCfg;
typedef SVCreateTbReq STbCfg;
// SMeta operations
SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF);
......
......@@ -822,12 +822,12 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t
assert(ptr != NULL);
}
//vnodeProcessWMsgs(pVnode->pImpl, pArray);
vnodeProcessWMsgs(pVnode->pImpl, pArray);
for (size_t i = 0; i < numOfMsgs; i++) {
SRpcMsg *pRsp = NULL;
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
int32_t code = 0; //vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
if (pRsp != NULL) {
pRsp->ahandle = pMsg->ahandle;
rpcSendResponse(pRsp);
......
......@@ -200,10 +200,12 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) {
return mndAcquireDb(pMnode, db);
}
static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int *contLen) {
static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int *pContLen) {
#if 1
SVCreateTbReq req;
void * buf;
int bsize;
SMsgHead * pMsgHead;
req.ver = 0;
req.name = pStb->name;
......@@ -217,19 +219,24 @@ static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
req.stbCfg.pTagSchema = pStb->pSchema + pStb->numOfColumns;
bsize = tSerializeSVCreateTbReq(NULL, &req);
buf = malloc(bsize);
buf = malloc(sizeof(SMsgHead) + bsize);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
void *pBuf = buf;
pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(sizeof(SMsgHead) + bsize);
pMsgHead->vgId = htonl(pVgroup->vgId);
void *pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSerializeSVCreateTbReq(&pBuf, &req);
*contLen = bsize;
*pContLen = sizeof(SMsgHead) + bsize;
return buf;
#if 0
#else
int32_t totalCols = pStb->numOfTags + pStb->numOfColumns;
int32_t contLen = totalCols * sizeof(SSchema) + sizeof(SCreateStbInternalMsg);
......@@ -255,7 +262,9 @@ static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
pSchema->bytes = htonl(pSchema->bytes);
pSchema->colId = htonl(pSchema->colId);
}
return pCreate;
*pContLen = contLen;
return pCreate;
#endif
}
......
......@@ -20,5 +20,5 @@ target_link_libraries(
# test
if(${BUILD_TEST})
add_subdirectory(test)
# add_subdirectory(test)
endif(${BUILD_TEST})
......@@ -34,7 +34,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
// ser request version
void * pBuf = pMsg->pCont;
void * pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int64_t ver = pVnode->state.processed++;
taosEncodeFixedU64(&pBuf, ver);
......@@ -51,8 +51,9 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
}
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SVnodeReq vReq;
void * ptr = vnodeMalloc(pVnode, pMsg->contLen);
SVnodeReq vReq;
SVCreateTbReq vCreateTbReq;
void * ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) {
// TODO: handle error
}
......@@ -62,17 +63,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// todo: change the interface here
uint64_t ver;
taosDecodeFixedU64(pMsg->pCont, &ver);
taosDecodeFixedU64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver);
if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) {
// TODO: handle error
}
vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType);
switch (pMsg->msgType) {
case TDMT_VND_CREATE_STB:
case TDMT_MND_CREATE_TABLE:
if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) {
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
// TODO: handle error
}
......@@ -90,6 +89,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
break;
default:
ASSERT(0);
break;
}
......
......@@ -22,11 +22,11 @@
extern "C" {
#endif
#define META_SUPER_TABLE 0
#define META_CHILD_TABLE 1
#define META_NORMAL_TABLE 2
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
int metaValidateTbOptions(SMeta *pMeta, const STbCfg *);
int metaValidateTbCfg(SMeta *pMeta, const STbCfg *);
size_t metaEncodeTbObjFromTbOptions(const STbCfg *, void *pBuf, size_t bsize);
#ifdef __cplusplus
......
......@@ -125,11 +125,11 @@ void metaCloseDB(SMeta *pMeta) {
}
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
tb_uid_t uid;
char buf[512];
void * pBuf;
DBT key, value;
STSchema *pSchema = NULL;
tb_uid_t uid;
char buf[512];
void * pBuf;
DBT key, value;
SSchema *pSchema = NULL;
if (pTbCfg->type == META_SUPER_TABLE) {
uid = pTbCfg->stbCfg.suid;
......@@ -156,9 +156,12 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
}
// save schema
uint32_t ncols;
if (pTbCfg->type == META_SUPER_TABLE) {
ncols = pTbCfg->stbCfg.nCols;
pSchema = pTbCfg->stbCfg.pSchema;
} else if (pTbCfg->type == META_NORMAL_TABLE) {
ncols = pTbCfg->ntbCfg.nCols;
pSchema = pTbCfg->ntbCfg.pSchema;
}
......@@ -166,12 +169,18 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
pBuf = buf;
memset(&key, 0, sizeof(key));
memset(&value, 0, sizeof(key));
SSchemaKey schemaKey = {uid, schemaVersion(pSchema)};
SSchemaKey schemaKey = {uid, 0 /*TODO*/};
key.data = &schemaKey;
key.size = sizeof(schemaKey);
tdEncodeSchema(&pBuf, pSchema);
taosEncodeFixedU32(&pBuf, ncols);
for (size_t i = 0; i < ncols; i++) {
taosEncodeFixedI8(&pBuf, pSchema[i].type);
taosEncodeFixedI32(&pBuf, pSchema[i].colId);
taosEncodeFixedI32(&pBuf, pSchema[i].bytes);
taosEncodeString(&pBuf, pSchema[i].name);
}
value.data = buf;
value.size = POINTER_DISTANCE(pBuf, buf);
......@@ -367,7 +376,15 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
tsize += taosEncodeFixedU8(buf, pTbCfg->type);
if (pTbCfg->type == META_SUPER_TABLE) {
tsize += tdEncodeSchema(buf, pTbCfg->stbCfg.pTagSchema);
tsize += taosEncodeVariantU32(buf, pTbCfg->stbCfg.nTagCols);
for (uint32_t i = 0; i < pTbCfg->stbCfg.nTagCols; i++) {
tsize += taosEncodeFixedI8(buf, pTbCfg->stbCfg.pSchema[i].type);
tsize += taosEncodeFixedI32(buf, pTbCfg->stbCfg.pSchema[i].colId);
tsize += taosEncodeFixedI32(buf, pTbCfg->stbCfg.pSchema[i].bytes);
tsize += taosEncodeString(buf, pTbCfg->stbCfg.pSchema[i].name);
}
// tsize += tdEncodeSchema(buf, pTbCfg->stbCfg.pTagSchema);
} else if (pTbCfg->type == META_CHILD_TABLE) {
tsize += taosEncodeFixedU64(buf, pTbCfg->ctbCfg.suid);
tsize += tdEncodeKVRow(buf, pTbCfg->ctbCfg.pTag);
......@@ -386,7 +403,14 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
buf = taosDecodeFixedU8(buf, &(pTbCfg->type));
if (pTbCfg->type == META_SUPER_TABLE) {
buf = tdDecodeSchema(buf, &(pTbCfg->stbCfg.pTagSchema));
buf = taosDecodeVariantU32(buf, pTbCfg->stbCfg.nTagCols);
pTbCfg->stbCfg.pTagSchema = (SSchema *)malloc(sizeof(SSchema) * pTbCfg->stbCfg.nTagCols);
for (uint32_t i = 0; i < pTbCfg->stbCfg.nTagCols; i++) {
buf = taosDecodeFixedI8(buf, &pTbCfg->stbCfg.pSchema[i].type);
buf = taosDecodeFixedI32(buf, &pTbCfg->stbCfg.pSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pTbCfg->stbCfg.pSchema[i].bytes);
buf = taosDecodeStringTo(buf, pTbCfg->stbCfg.pSchema[i].name);
}
} else if (pTbCfg->type == META_CHILD_TABLE) {
buf = taosDecodeFixedU64(buf, &(pTbCfg->ctbCfg.suid));
buf = tdDecodeKVRow(buf, &(pTbCfg->ctbCfg.pTag));
......
......@@ -17,7 +17,7 @@
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) {
// Validate the tbOptions
if (metaValidateTbOptions(pMeta, pTbCfg) < 0) {
if (metaValidateTbCfg(pMeta, pTbCfg) < 0) {
// TODO: handle error
return -1;
}
......
......@@ -16,7 +16,7 @@
#include "metaDef.h"
#include "tcoding.h"
int metaValidateTbOptions(SMeta *pMeta, const STbCfg *pTbOptions) {
int metaValidateTbCfg(SMeta *pMeta, const STbCfg *pTbOptions) {
// TODO
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册