提交 44dc05f3 编写于 作者: H Hongze Cheng

more refact meta

上级 3ed29b7b
...@@ -1558,13 +1558,23 @@ typedef struct { ...@@ -1558,13 +1558,23 @@ typedef struct {
int32_t code; int32_t code;
} SVCreateTbRsp, SVUpdateTbRsp; } SVCreateTbRsp, SVUpdateTbRsp;
int tEncodeSVCreateTbRsp(SCoder* pCoder, const SVCreateTbRsp* pRsp);
int tDecodeSVCreateTbRsp(SCoder* pCoder, SVCreateTbRsp* pRsp);
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
typedef struct { typedef struct {
SArray* rspList; // SArray<SVCreateTbRsp> int32_t nRsps;
union {
SVCreateTbRsp* pRsps;
SArray* pArray;
};
} SVCreateTbBatchRsp; } SVCreateTbBatchRsp;
int tEncodeSVCreateTbBatchRsp(SCoder* pCoder, const SVCreateTbBatchRsp* pRsp);
int tDecodeSVCreateTbBatchRsp(SCoder* pCoder, SVCreateTbBatchRsp* pRsp);
int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
......
...@@ -3418,6 +3418,36 @@ int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatc ...@@ -3418,6 +3418,36 @@ int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatc
return 0; return 0;
} }
int tEncodeSVCreateTbBatchRsp(SCoder *pCoder, const SVCreateTbBatchRsp *pRsp) {
int32_t nRsps = taosArrayGetSize(pRsp->pArray);
SVCreateTbRsp *pCreateRsp;
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32v(pCoder, nRsps) < 0) return -1;
for (int32_t i = 0; i < nRsps; i++) {
pCreateRsp = taosArrayGet(pRsp->pArray, i);
if (tEncodeSVCreateTbRsp(pCoder, pCreateRsp) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
}
int tDecodeSVCreateTbBatchRsp(SCoder *pCoder, SVCreateTbBatchRsp *pRsp) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32v(pCoder, &pRsp->nRsps) < 0) return -1;
pRsp->pRsps = (SVCreateTbRsp *)TCODER_MALLOC(pCoder, sizeof(*pRsp->pRsps) * pRsp->nRsps);
for (int32_t i = 0; i < pRsp->nRsps; i++) {
if (tDecodeSVCreateTbRsp(pCoder, pRsp->pRsps + i) < 0) return -1;
}
tEndDecode(pCoder);
return 0;
}
int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) { int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) {
int32_t tlen = 0; int32_t tlen = 0;
...@@ -3642,6 +3672,24 @@ int tDecodeSVCreateTbBatchReq(SCoder *pCoder, SVCreateTbBatchReq *pReq) { ...@@ -3642,6 +3672,24 @@ int tDecodeSVCreateTbBatchReq(SCoder *pCoder, SVCreateTbBatchReq *pReq) {
if (tDecodeSVCreateTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1; if (tDecodeSVCreateTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1;
} }
tEndDecode(pCoder);
return 0;
}
int tEncodeSVCreateTbRsp(SCoder *pCoder, const SVCreateTbRsp *pRsp) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32(pCoder, pRsp->code) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
int tDecodeSVCreateTbRsp(SCoder *pCoder, SVCreateTbRsp *pRsp) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32(pCoder, &pRsp->code) < 0) return -1;
tEndDecode(pCoder); tEndDecode(pCoder);
return 0; return 0;
} }
\ No newline at end of file
...@@ -58,7 +58,7 @@ int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); ...@@ -58,7 +58,7 @@ int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); int vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
// meta // meta
typedef struct SMeta SMeta; // todo: remove typedef struct SMeta SMeta; // todo: remove
......
...@@ -126,10 +126,10 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { ...@@ -126,10 +126,10 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
return 0; return 0;
} }
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) { int vnodeValidateTableHash(SVnode *pVnode, char *tableFName) {
uint32_t hashValue = 0; uint32_t hashValue = 0;
switch (pVnodeOptions->hashMethod) { switch (pVnode->config.hashMethod) {
default: default:
hashValue = MurmurHash3_32(tableFName, strlen(tableFName)); hashValue = MurmurHash3_32(tableFName, strlen(tableFName));
break; break;
...@@ -143,5 +143,5 @@ int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) { ...@@ -143,5 +143,5 @@ int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) {
} }
#endif #endif
return TSDB_CODE_SUCCESS; return 0;
} }
...@@ -42,7 +42,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -42,7 +42,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName)); memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName));
metaRsp.dbId = pVnode->config.dbId; metaRsp.dbId = pVnode->config.dbId;
sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName); sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName);
code = vnodeValidateTableHash(&pVnode->config, tableFName); code = vnodeValidateTableHash(pVnode, tableFName);
if (code) { if (code) {
goto _exit; goto _exit;
} }
......
...@@ -249,8 +249,14 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -249,8 +249,14 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
int rcode = 0; int rcode = 0;
SVCreateTbBatchReq req = {0}; SVCreateTbBatchReq req = {0};
SVCreateTbReq *pCreateReq; SVCreateTbReq *pCreateReq;
SVCreateTbBatchRsp rsp = {0};
SVCreateTbRsp cRsp = {0};
char tbName[TSDB_TABLE_FNAME_LEN];
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
// decode // decode
tCoderInit(&coder, TD_LITTLE_ENDIAN, pReq, len, TD_DECODER); tCoderInit(&coder, TD_LITTLE_ENDIAN, pReq, len, TD_DECODER);
...@@ -260,17 +266,47 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -260,17 +266,47 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
goto _exit; goto _exit;
} }
rsp.pArray = taosArrayInit(sizeof(cRsp), req.nReqs);
if (rsp.pArray == NULL) {
rcode = -1;
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// loop to create table // loop to create table
for (int iReq = 0; iReq < req.nReqs; iReq++) { for (int iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq; pCreateReq = req.pReqs + iReq;
// validate hash
sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
if (vnodeValidateTableHash(pVnode, tbName) < 0) {
cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
taosArrayPush(rsp.pArray, &cRsp);
continue;
}
// do create table
if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) { if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
// TODO: fill request cRsp.code = terrno;
} else { } else {
// TODO cRsp.code = TSDB_CODE_SUCCESS;
} }
taosArrayPush(rsp.pArray, &cRsp);
} }
tCoderClear(&coder);
// prepare rsp // prepare rsp
tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen);
pRsp->pCont = rpcMallocCont(pRsp->contLen);
if (pRsp->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
rcode = -1;
goto _exit;
}
tCoderInit(&coder, TD_LITTLE_ENDIAN, pRsp->pCont, pRsp->contLen, TD_ENCODER);
tEncodeSVCreateTbBatchRsp(&coder, &rsp);
_exit: _exit:
tCoderClear(&coder); tCoderClear(&coder);
......
...@@ -1076,17 +1076,17 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -1076,17 +1076,17 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SVCreateTbBatchRsp batchRsp = {0}; SVCreateTbBatchRsp batchRsp = {0};
if (msg) { if (msg) {
SCH_ERR_JRET(tDeserializeSVCreateTbBatchRsp(msg, msgSize, &batchRsp)); SCH_ERR_JRET(tDeserializeSVCreateTbBatchRsp(msg, msgSize, &batchRsp));
if (batchRsp.rspList) { if (batchRsp.pArray) {
int32_t num = taosArrayGetSize(batchRsp.rspList); int32_t num = taosArrayGetSize(batchRsp.pArray);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SVCreateTbRsp *rsp = taosArrayGet(batchRsp.rspList, i); SVCreateTbRsp *rsp = taosArrayGet(batchRsp.pArray, i);
if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) { if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
taosArrayDestroy(batchRsp.rspList); taosArrayDestroy(batchRsp.pArray);
SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(rsp->code);
} }
} }
taosArrayDestroy(batchRsp.rspList); taosArrayDestroy(batchRsp.pArray);
} }
} }
...@@ -2734,7 +2734,7 @@ void schedulerFreeTaskList(SArray *taskList) { ...@@ -2734,7 +2734,7 @@ void schedulerFreeTaskList(SArray *taskList) {
void schedulerDestroy(void) { void schedulerDestroy(void) {
if (schMgmt.jobRef) { if (schMgmt.jobRef) {
SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0); SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
int64_t refId = 0; int64_t refId = 0;
while (pJob) { while (pJob) {
refId = pJob->refId; refId = pJob->refId;
...@@ -2751,12 +2751,12 @@ void schedulerDestroy(void) { ...@@ -2751,12 +2751,12 @@ void schedulerDestroy(void) {
} }
if (schMgmt.hbConnections) { if (schMgmt.hbConnections) {
void *pIter = taosHashIterate(schMgmt.hbConnections, NULL); void *pIter = taosHashIterate(schMgmt.hbConnections, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SSchHbTrans *hb = pIter; SSchHbTrans *hb = pIter;
schFreeRpcCtx(&hb->rpcCtx); schFreeRpcCtx(&hb->rpcCtx);
pIter = taosHashIterate(schMgmt.hbConnections, pIter); pIter = taosHashIterate(schMgmt.hbConnections, pIter);
} }
taosHashCleanup(schMgmt.hbConnections); taosHashCleanup(schMgmt.hbConnections);
schMgmt.hbConnections = NULL; schMgmt.hbConnections = NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册