提交 167657c1 编写于 作者: H Hongze Cheng

refactor: sync

上级 eeb45f21
......@@ -97,7 +97,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
int64_t version;
SRpcMsg rsp;
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
if (pArray == NULL) {
......@@ -116,13 +116,15 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
}
}
#if 1
int64_t version;
vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version);
numOfMsgs = taosArrayGetSize(pArray);
for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
SRpcMsg *pRpc = &pMsg->rpcMsg;
SRpcMsg rsp;
rsp.pCont = NULL;
rsp.contLen = 0;
......@@ -132,20 +134,9 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp);
tmsgSendRsp(&rsp);
#if 0
if (pRsp != NULL) {
pRsp->ahandle = pRpc->ahandle;
taosMemoryFree(pRsp);
} else {
if (code != 0 && terrno != 0) code = terrno;
vmSendRsp(pVnode->pWrapper, pMsg, code);
}
#endif
}
#else
// sync integration response
/*
for (int i = 0; i < taosArrayGetSize(pArray); i++) {
SNodeMsg *pMsg;
SRpcMsg *pRpc;
......@@ -153,18 +144,18 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
pRpc = &pMsg->rpcMsg;
// set request version
void *pBuf = POINTER_SHIFT(pRpc->pCont, sizeof(SMsgHead));
// int64_t ver = pVnode->pImpl->state.processed++; // ???????
int64_t ver;
taosEncodeFixedI64(&pBuf, ver);
rsp.ahandle = pRpc->ahandle;
rsp.handle = pRpc->handle;
rsp.pCont = NULL;
rsp.contLen = 0;
int32_t ret = syncPropose(pVnode->pImpl->sync, pRpc, false);
int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pRpc, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
// not leader
// send response
rsp.code = -1;
tmsgSendRsp(&rsp);
} else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
// send response
rsp.code = -2;
tmsgSendRsp(&rsp);
} else if (ret == TAOS_SYNC_PROPOSE_SUCCESS) {
// ok
// send response in applyQ
......@@ -172,7 +163,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
assert(0);
}
}
*/
#endif
for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
......@@ -187,13 +178,25 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL;
SRpcMsg rsp;
for (int32_t i = 0; i < numOfMsgs; ++i) {
#if 0
taosGetQitem(qall, (void **)&pMsg);
// todo
SRpcMsg *pRsp = NULL;
// (void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) {
rsp.ahandle = pMsg->rpcMsg.ahandle;
rsp.handle = pMsg->rpcMsg.handle;
rsp.code = 0;
rsp.pCont = NULL;
rsp.contLen = 0;
if (vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &rsp) < 0) {
rsp.code = terrno;
tmsgSendRsp(&rsp);
}
}
#endif
}
}
......
......@@ -21,10 +21,10 @@
#include "tqueue.h"
#include "trpc.h"
#include "sync.h"
#include "tarray.h"
#include "tfs.h"
#include "wal.h"
#include "sync.h"
#include "tcommon.h"
#include "tfs.h"
......@@ -61,6 +61,8 @@ int32_t vnodeSync(SVnode *pVnode);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
int64_t vnodeGetSyncHandle(SVnode *pVnode);
// meta
typedef struct SMeta SMeta; // todo: remove
typedef struct SMTbCursor SMTbCursor;
......@@ -148,7 +150,7 @@ struct SVnodeCfg {
bool isWeak;
STsdbCfg tsdbCfg;
SWalCfg walCfg;
SSyncCfg syncCfg; // sync integration
SSyncCfg syncCfg; // sync integration
uint32_t hashBegin;
uint32_t hashEnd;
int8_t hashMethod;
......
......@@ -54,8 +54,8 @@ typedef struct SQWorkerMgmt SQHandle;
#define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb"
#define VNODE_TQ_DIR "tq"
#define VNODE_WAL_DIR "wal"
#define VNODE_TQ_DIR "tq"
#define VNODE_WAL_DIR "wal"
typedef struct {
int8_t streamType; // sma or other
......
......@@ -165,3 +165,5 @@ void vnodeClose(SVnode *pVnode) {
taosMemoryFree(pVnode);
}
}
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册