提交 ca45e6c9 编写于 作者: H Hongze Cheng

refact meta 4

上级 bd36dc99
...@@ -1479,7 +1479,7 @@ typedef struct SVCreateStbReq { ...@@ -1479,7 +1479,7 @@ typedef struct SVCreateStbReq {
SSchema* pSchema; SSchema* pSchema;
int16_t nTags; int16_t nTags;
SSchema* pSchemaTg; SSchema* pSchemaTg;
SRSmaParam* pRSmaParam; SRSmaParam pRSmaParam;
} SVCreateStbReq; } SVCreateStbReq;
int tEncodeSVCreateStbReq(SCoder* pCoder, const SVCreateStbReq* pReq); int tEncodeSVCreateStbReq(SCoder* pCoder, const SVCreateStbReq* pReq);
......
...@@ -3076,7 +3076,6 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq ...@@ -3076,7 +3076,6 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq
return 0; return 0;
} }
int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) { int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) {
SCoder encoder = {0}; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
...@@ -3096,7 +3095,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq ...@@ -3096,7 +3095,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
SReplica *pReplica = &pReq->replicas[i]; SReplica *pReplica = &pReq->replicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1; if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
} }
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -3129,7 +3128,6 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR ...@@ -3129,7 +3128,6 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
return 0; return 0;
} }
int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) { int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) {
SCoder encoder = {0}; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
...@@ -3723,16 +3721,17 @@ int tDecodeSVCreateStbReq(SCoder *pCoder, SVCreateStbReq *pReq) { ...@@ -3723,16 +3721,17 @@ int tDecodeSVCreateStbReq(SCoder *pCoder, SVCreateStbReq *pReq) {
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1; if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->ttl) < 0) return -1; if (tDecodeI32(pCoder, &pReq->ttl) < 0) return -1;
if (tDecodeI16v(pCoder, &pReq->nCols) < 0) return -1;
// TCODER_MALLOC(pReq->pSchema, SSchema, sizeof(SSchema) * pReq->nCols, pCoder); if (tDecodeI16v(pCoder, &pReq->nCols) < 0) return -1;
pReq->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pReq->nCols); pReq->pSchema = (SSchema *)TCODER_MALLOC(pCoder, sizeof(SSchema) * pReq->nCols);
if (pReq->pSchema == NULL) return -1;
for (int iCol = 0; iCol < pReq->nCols; iCol++) { for (int iCol = 0; iCol < pReq->nCols; iCol++) {
if (tDecodeSSchema(pCoder, pReq->pSchema + iCol) < 0) return -1; if (tDecodeSSchema(pCoder, pReq->pSchema + iCol) < 0) return -1;
} }
if (tDecodeI16v(pCoder, &pReq->nTags) < 0) return -1; if (tDecodeI16v(pCoder, &pReq->nTags) < 0) return -1;
// TCODER_MALLOC(pReq->pSchemaTg, SSchema, sizeof(SSchema) * pReq->nTags, pCoder); pReq->pSchemaTg = (SSchema *)TCODER_MALLOC(pCoder, sizeof(SSchema) * pReq->nTags);
if (pReq->pSchemaTg == NULL) return -1;
pReq->pSchemaTg = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pReq->nTags); pReq->pSchemaTg = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pReq->nTags);
for (int iTag = 0; iTag < pReq->nTags; iTag++) { for (int iTag = 0; iTag < pReq->nTags; iTag++) {
if (tDecodeSSchema(pCoder, pReq->pSchemaTg + iTag) < 0) return -1; if (tDecodeSSchema(pCoder, pReq->pSchemaTg + iTag) < 0) return -1;
......
...@@ -394,66 +394,55 @@ static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSch ...@@ -394,66 +394,55 @@ static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSch
} }
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
#if 0 SCoder coder;
SName name = {0}; int32_t contLen;
SName name = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFName); tNameGetFullDbName(&name, dbFName);
SVCreateTbReq req = {0}; SVCreateStbReq req = {0};
req.name = (char *)tNameGetTableName(&name); req.name = (char *)tNameGetTableName(&name);
req.ttl = 0; req.suid = pStb->uid;
req.keep = 0;
req.rollup = pStb->aggregationMethod > -1 ? 1 : 0; req.rollup = pStb->aggregationMethod > -1 ? 1 : 0;
req.type = TD_SUPER_TABLE; req.ttl = 0;
req.stbCfg.suid = pStb->uid; req.nCols = pStb->numOfColumns;
req.stbCfg.nCols = pStb->numOfColumns; req.pSchema = pStb->pColumns;
req.stbCfg.nTagCols = pStb->numOfTags; req.nTags = pStb->numOfTags;
req.stbCfg.pTagSchema = pStb->pTags; req.pSchemaTg = pStb->pTags;
req.stbCfg.pSchema = (SSchema *)taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
if (req.stbCfg.pSchema == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memcpy(req.stbCfg.pSchema, pStb->pColumns, sizeof(SSchema) * pStb->numOfColumns); // TODO: remove here
for (int i = 0; i < pStb->numOfColumns; i++) { for (int iCol = 0; iCol < req.nCols; iCol++) {
req.stbCfg.pSchema[i].flags = SCHEMA_SMA_ON; req.pSchema[iCol].flags = SCHEMA_SMA_ON;
} }
SRSmaParam *pRSmaParam = NULL;
if (req.rollup) { if (req.rollup) {
pRSmaParam = (SRSmaParam *)taosMemoryCalloc(1, sizeof(SRSmaParam)); req.pRSmaParam.xFilesFactor = pStb->xFilesFactor;
if (pRSmaParam == NULL) { req.pRSmaParam.delay = pStb->delay;
taosMemoryFreeClear(req.stbCfg.pSchema); req.pRSmaParam.nFuncIds = 1; // only 1 aggregation method supported currently
terrno = TSDB_CODE_OUT_OF_MEMORY; req.pRSmaParam.pFuncIds = (func_id_t *)taosMemoryCalloc(req.pRSmaParam.nFuncIds, sizeof(func_id_t));
return NULL; if (req.pRSmaParam.pFuncIds == NULL) {
}
pRSmaParam->xFilesFactor = pStb->xFilesFactor;
pRSmaParam->delay = pStb->delay;
pRSmaParam->nFuncIds = 1; // only 1 aggregation method supported currently
pRSmaParam->pFuncIds = (func_id_t *)taosMemoryCalloc(pRSmaParam->nFuncIds, sizeof(func_id_t));
if (pRSmaParam->pFuncIds == NULL) {
taosMemoryFreeClear(req.stbCfg.pRSmaParam);
taosMemoryFreeClear(req.stbCfg.pSchema);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
for (int32_t f = 0; f < pRSmaParam->nFuncIds; ++f) { for (int32_t f = 0; f < req.pRSmaParam.nFuncIds; ++f) {
*(pRSmaParam->pFuncIds + f) = pStb->aggregationMethod; req.pRSmaParam.pFuncIds[f] = pStb->aggregationMethod;
} }
req.stbCfg.pRSmaParam = pRSmaParam;
} }
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead); // get length
tCoderInit(&coder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
if (tEncodeSVCreateStbReq(&coder, &req) < 0) {
taosMemoryFree(req.pRSmaParam.pFuncIds);
return NULL;
}
tCoderClear(&coder);
contLen = sizeof(SMsgHead) + coder.pos;
SMsgHead *pHead = taosMemoryMalloc(contLen); SMsgHead *pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) { if (pHead == NULL) {
if (pRSmaParam) { taosMemoryFree(req.pRSmaParam.pFuncIds);
taosMemoryFreeClear(pRSmaParam->pFuncIds);
taosMemoryFreeClear(pRSmaParam);
}
// taosMemoryFreeClear(req.stbCfg.pSchema);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -462,17 +451,16 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt ...@@ -462,17 +451,16 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
pHead->vgId = htonl(pVgroup->vgId); pHead->vgId = htonl(pVgroup->vgId);
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead)); void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tSerializeSVCreateTbReq(&pBuf, &req); tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, contLen - sizeof(SMsgHead), TD_ENCODER);
if (tEncodeSVCreateStbReq(&coder, &req) < 0) {
taosMemoryFree(req.pRSmaParam.pFuncIds);
return NULL;
}
tCoderClear(&coder);
*pContLen = contLen; *pContLen = contLen;
if (pRSmaParam) { taosMemoryFree(req.pRSmaParam.pFuncIds);
taosMemoryFreeClear(pRSmaParam->pFuncIds);
taosMemoryFreeClear(pRSmaParam);
}
// taosMemoryFreeClear(req.stbCfg.pSchema);
return pHead; return pHead;
#endif
return NULL;
} }
static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
......
...@@ -45,6 +45,9 @@ void metaCloseIdx(SMeta* pMeta); ...@@ -45,6 +45,9 @@ void metaCloseIdx(SMeta* pMeta);
int metaSaveTableToIdx(SMeta* pMeta, const STbCfg* pTbOptions); int metaSaveTableToIdx(SMeta* pMeta, const STbCfg* pTbOptions);
int metaRemoveTableFromIdx(SMeta* pMeta, tb_uid_t uid); int metaRemoveTableFromIdx(SMeta* pMeta, tb_uid_t uid);
// metaTable ==================
int metaCreateSTable(SMeta* pMeta, SVCreateStbReq* pReq, SVCreateStbRsp* pRsp);
// metaCommit ================== // metaCommit ==================
int metaBegin(SMeta* pMeta); int metaBegin(SMeta* pMeta);
......
...@@ -15,6 +15,11 @@ ...@@ -15,6 +15,11 @@
#include "vnodeInt.h" #include "vnodeInt.h"
int metaCreateSTable(SMeta *pMeta, SVCreateStbReq *pReq, SVCreateStbRsp *pRsp) {
// TODO
return 0;
}
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) { int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) {
#ifdef META_REFACT #ifdef META_REFACT
#else #else
......
...@@ -196,20 +196,22 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -196,20 +196,22 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq, int len) { static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq, int len) {
SVCreateTbReq vCreateTbReq = {0}; SVCreateStbReq req = {0};
tDeserializeSVCreateTbReq(pReq, &vCreateTbReq); SCoder coder;
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
// TODO tCoderInit(&coder, TD_LITTLE_ENDIAN, pReq, len, TD_DECODER);
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
tCoderClear(&coder);
return -1; return -1;
} }
// taosMemoryFree(vCreateTbReq.stbCfg.pSchema); if (metaCreateSTable(pVnode->pMeta, pReq, NULL) < 0) {
// taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema); tCoderClear(&coder);
// if (vCreateTbReq.stbCfg.pRSmaParam) { return -1;
// taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds); }
// taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
// } tCoderClear(&coder);
// taosMemoryFree(vCreateTbReq.name);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册