未验证 提交 4d23e71b 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4111 from taosdata/feature/wal

Feature/wal
...@@ -334,7 +334,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -334,7 +334,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
pHead->version = 0; pHead->version = 0;
// write into vnode write queue // write into vnode write queue
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ, NULL);
free(buffer); free(buffer);
} }
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
int64_t ver = 0; int64_t ver = 0;
void *pCq = NULL; void *pCq = NULL;
int writeToQueue(void *pVnode, void *data, int type) { int writeToQueue(void *pVnode, void *data, int type, void *pMsg) {
return 0; return 0;
} }
......
...@@ -31,14 +31,6 @@ typedef struct { ...@@ -31,14 +31,6 @@ typedef struct {
pthread_t thread; // thread pthread_t thread; // thread
} SWriteWorker; } SWriteWorker;
typedef struct {
SRspRet rspRet;
SRpcMsg rpcMsg;
int32_t processedCount;
int32_t code;
int32_t contLen;
void * pCont;
} SWriteMsg;
typedef struct { typedef struct {
int32_t max; // max number of workers int32_t max; // max number of workers
...@@ -86,39 +78,38 @@ void dnodeCleanupVWrite() { ...@@ -86,39 +78,38 @@ void dnodeCleanupVWrite() {
dInfo("dnode vwrite is closed"); dInfo("dnode vwrite is closed");
} }
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg) { void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) {
char *pCont = pMsg->pCont; int32_t code;
char *pCont = pRpcMsg->pCont;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { if (pRpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
SMsgDesc *pDesc = (SMsgDesc *)pCont; SMsgDesc *pDesc = (SMsgDesc *)pCont;
pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
pCont += sizeof(SMsgDesc); pCont += sizeof(SMsgDesc);
} }
SMsgHead *pHead = (SMsgHead *) pCont; SMsgHead *pMsg = (SMsgHead *)pCont;
pHead->vgId = htonl(pHead->vgId); pMsg->vgId = htonl(pMsg->vgId);
pHead->contLen = htonl(pHead->contLen); pMsg->contLen = htonl(pMsg->contLen);
taos_queue queue = vnodeAcquireWqueue(pHead->vgId); void *pVnode = vnodeAcquire(pMsg->vgId);
if (queue) { if (pVnode == NULL) {
// put message into queue code = TSDB_CODE_VND_INVALID_VGROUP_ID;
SWriteMsg *pWrite = taosAllocateQitem(sizeof(SWriteMsg));
pWrite->rpcMsg = *pMsg;
pWrite->pCont = pCont;
pWrite->contLen = pHead->contLen;
taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
} else { } else {
SRpcMsg rpcRsp = { SWalHead *pHead = (SWalHead *)(pCont - sizeof(SWalHead));
.handle = pMsg->handle, pHead->msgType = pRpcMsg->msgType;
.pCont = NULL, pHead->version = 0;
.contLen = 0, pHead->len = pMsg->contLen;
.code = TSDB_CODE_VND_INVALID_VGROUP_ID, code = vnodeWriteToQueue(pVnode, pHead, TAOS_QTYPE_RPC, pRpcMsg);
.msgType = 0 }
};
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = code};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont);
} }
vnodeRelease(pVnode);
rpcFreeCont(pRpcMsg->pCont);
} }
void *dnodeAllocVWriteQueue(void *pVnode) { void *dnodeAllocVWriteQueue(void *pVnode) {
...@@ -179,7 +170,7 @@ void dnodeFreeVWriteQueue(void *wqueue) { ...@@ -179,7 +170,7 @@ void dnodeFreeVWriteQueue(void *wqueue) {
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
if (param == NULL) return; if (param == NULL) return;
SWriteMsg *pWrite = param; SVWriteMsg *pWrite = param;
if (code < 0) pWrite->code = code; if (code < 0) pWrite->code = code;
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1); int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
...@@ -187,26 +178,22 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { ...@@ -187,26 +178,22 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
if (count <= 1) return; if (count <= 1) return;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle, .handle = pWrite->rpcHandle,
.pCont = pWrite->rspRet.rsp, .pCont = pWrite->rspRet.rsp,
.contLen = pWrite->rspRet.len, .contLen = pWrite->rspRet.len,
.code = pWrite->code, .code = pWrite->code,
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
rpcFreeCont(pWrite->rpcMsg.pCont);
taosFreeQitem(pWrite); taosFreeQitem(pWrite);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
static void *dnodeProcessWriteQueue(void *param) { static void *dnodeProcessWriteQueue(void *param) {
SWriteWorker *pWorker = (SWriteWorker *)param; SWriteWorker *pWorker = param;
SWriteMsg * pWrite; SVWriteMsg * pWrite;
SWalHead * pHead;
SRspRet * pRspRet;
void * pVnode; void * pVnode;
void * pItem;
int32_t numOfMsgs; int32_t numOfMsgs;
int32_t qtype; int32_t qtype;
...@@ -220,36 +207,14 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -220,36 +207,14 @@ static void *dnodeProcessWriteQueue(void *param) {
} }
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
pWrite = NULL; taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
pRspRet = NULL; dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%" PRIu64, pWrite->rpcAhandle, pWrite,
taosGetQitem(pWorker->qall, &qtype, &pItem); taosMsg[pWrite->pHead->msgType], qtype, pWrite->pHead->version);
if (qtype == TAOS_QTYPE_RPC) {
pWrite = pItem;
pRspRet = &pWrite->rspRet;
pHead = (SWalHead *)((char *)pWrite->pCont - sizeof(SWalHead));
pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0;
pHead->len = pWrite->contLen;
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType]);
} else if (qtype == TAOS_QTYPE_CQ) {
pHead = (SWalHead *)((char *)pItem + sizeof(SSyncHead));
dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
pHead->version);
} else {
pHead = pItem;
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
pHead->version);
}
int32_t code = vnodeProcessWrite(pVnode, qtype, pHead, pRspRet); pWrite->code = vnodeProcessWrite(pVnode, qtype, pWrite);
dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType], if (pWrite->code <= 0) pWrite->processedCount = 1;
pHead->version, tstrerror(code));
if (pWrite) { dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code));
pWrite->rpcMsg.code = code;
if (code <= 0) pWrite->processedCount = 1;
}
} }
walFsync(vnodeGetWal(pVnode)); walFsync(vnodeGetWal(pVnode));
...@@ -257,17 +222,15 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -257,17 +222,15 @@ static void *dnodeProcessWriteQueue(void *param) {
// browse all items, and process them one by one // browse all items, and process them one by one
taosResetQitems(pWorker->qall); taosResetQitems(pWorker->qall);
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(pWorker->qall, &qtype, &pItem); taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
if (qtype == TAOS_QTYPE_RPC) { if (qtype == TAOS_QTYPE_RPC) {
pWrite = pItem; dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
dnodeSendRpcVWriteRsp(pVnode, pItem, pWrite->rpcMsg.code);
} else if (qtype == TAOS_QTYPE_FWD) { } else if (qtype == TAOS_QTYPE_FWD) {
pHead = pItem; vnodeConfirmForward(pVnode, pWrite->pHead->version, 0);
vnodeConfirmForward(pVnode, pHead->version, 0); taosFreeQitem(pWrite);
taosFreeQitem(pItem);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} else { } else {
taosFreeQitem(pItem); taosFreeQitem(pWrite);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
} }
......
...@@ -21,7 +21,7 @@ extern "C" { ...@@ -21,7 +21,7 @@ extern "C" {
#include "tdataformat.h" #include "tdataformat.h"
typedef int (*FCqWrite)(void *ahandle, void *pHead, int type); typedef int32_t (*FCqWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
typedef struct { typedef struct {
int vgId; int vgId;
......
...@@ -70,8 +70,8 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin ...@@ -70,8 +70,8 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId); typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId);
// when a forward pkt is received, call this to handle data // when a forward pkt is received, call this to handle data
typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type); typedef int32_t (*FWriteToCache)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
// when forward is confirmed by peer, master call this API to notify app // when forward is confirmed by peer, master call this API to notify app
typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
......
...@@ -43,8 +43,8 @@ typedef struct { ...@@ -43,8 +43,8 @@ typedef struct {
int8_t keep; // keep the wal file when closed int8_t keep; // keep the wal file when closed
} SWalCfg; } SWalCfg;
typedef void* twalh; // WAL HANDLE typedef void * twalh; // WAL HANDLE
typedef int (*FWalWrite)(void *ahandle, void *pHead, int type); typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
int32_t walInit(); int32_t walInit();
void walCleanUp(); void walCleanUp();
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
extern "C" { extern "C" {
#endif #endif
#include "twal.h"
typedef enum _VN_STATUS { typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT, TAOS_VN_STATUS_INIT,
TAOS_VN_STATUS_READY, TAOS_VN_STATUS_READY,
...@@ -29,9 +31,9 @@ typedef enum _VN_STATUS { ...@@ -29,9 +31,9 @@ typedef enum _VN_STATUS {
} EVnStatus; } EVnStatus;
typedef struct { typedef struct {
int len; int32_t len;
void *rsp; void * rsp;
void *qhandle; //used by query and retrieve msg void * qhandle; // used by query and retrieve msg
} SRspRet; } SRspRet;
typedef struct { typedef struct {
...@@ -41,6 +43,16 @@ typedef struct { ...@@ -41,6 +43,16 @@ typedef struct {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
} SReadMsg; } SReadMsg;
typedef struct {
int32_t code;
int32_t processedCount;
void * rpcHandle;
void * rpcAhandle;
SRspRet rspRet;
char reserveForSync[16];
SWalHead pHead[];
} SVWriteMsg;
extern char *vnodeStatus[]; extern char *vnodeStatus[];
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg); int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
...@@ -51,11 +63,11 @@ int32_t vnodeClose(int32_t vgId); ...@@ -51,11 +63,11 @@ int32_t vnodeClose(int32_t vgId);
void* vnodeAcquire(int32_t vgId); // add refcount void* vnodeAcquire(int32_t vgId); // add refcount
void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue
void* vnodeAcquireWqueue(int32_t vgId); // add recCount, get write queue
void vnodeRelease(void *pVnode); // dec refCount void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg);
int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite);
int32_t vnodeCheckWrite(void *pVnode); int32_t vnodeCheckWrite(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param); void vnodeBuildStatusMsg(void *param);
......
...@@ -98,8 +98,8 @@ static taos_qall tsSdbWriteQall; ...@@ -98,8 +98,8 @@ static taos_qall tsSdbWriteQall;
static taos_queue tsSdbWriteQueue; static taos_queue tsSdbWriteQueue;
static SSdbWriteWorkerPool tsSdbPool; static SSdbWriteWorkerPool tsSdbPool;
static int sdbWrite(void *param, void *data, int type); static int32_t sdbWrite(void *param, void *data, int32_t type, void *pMsg);
static int sdbWriteToQueue(void *param, void *data, int type); static int32_t sdbWriteToQueue(void *param, void *data, int32_t type, void *pMsg);
static void * sdbWorkerFp(void *param); static void * sdbWorkerFp(void *param);
static int32_t sdbInitWriteWorker(); static int32_t sdbInitWriteWorker();
static void sdbCleanupWriteWorker(); static void sdbCleanupWriteWorker();
...@@ -575,7 +575,7 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) { ...@@ -575,7 +575,7 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int sdbWrite(void *param, void *data, int type) { static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
SSdbOper *pOper = param; SSdbOper *pOper = param;
SWalHead *pHead = data; SWalHead *pHead = data;
int32_t tableId = pHead->msgType / 10; int32_t tableId = pHead->msgType / 10;
...@@ -1040,13 +1040,13 @@ void sdbFreeWritequeue() { ...@@ -1040,13 +1040,13 @@ void sdbFreeWritequeue() {
tsSdbWriteQueue = NULL; tsSdbWriteQueue = NULL;
} }
int sdbWriteToQueue(void *param, void *data, int type) { int32_t sdbWriteToQueue(void *param, void *data, int32_t qtype, void *pMsg) {
SWalHead *pHead = data; SWalHead *pHead = data;
int size = sizeof(SWalHead) + pHead->len; int32_t size = sizeof(SWalHead) + pHead->len;
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
memcpy(pWal, pHead, size); memcpy(pWal, pHead, size);
taosWriteQitem(tsSdbWriteQueue, type, pWal); taosWriteQitem(tsSdbWriteQueue, qtype, pWal);
return 0; return 0;
} }
...@@ -1081,7 +1081,7 @@ static void *sdbWorkerFp(void *param) { ...@@ -1081,7 +1081,7 @@ static void *sdbWorkerFp(void *param) {
pOper = NULL; pOper = NULL;
} }
int32_t code = sdbWrite(pOper, pHead, type); int32_t code = sdbWrite(pOper, pHead, type, NULL);
if (code > 0) code = 0; if (code > 0) code = 0;
if (pOper) { if (pOper) {
pOper->retCode = code; pOper->retCode = code;
......
...@@ -863,7 +863,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { ...@@ -863,7 +863,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version; // nodeVersion = pHead->version;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD); (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL);
} else { } else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead); syncSaveIntoBuffer(pPeer, pHead);
......
...@@ -154,7 +154,7 @@ static int syncRestoreWal(SSyncPeer *pPeer) { ...@@ -154,7 +154,7 @@ static int syncRestoreWal(SSyncPeer *pPeer) {
if (ret < 0) break; if (ret < 0) break;
sDebug("%s, restore a record, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, restore a record, ver:%" PRIu64, pPeer->id, pHead->version);
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL); (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL, NULL);
} }
if (code < 0) { if (code < 0) {
...@@ -169,7 +169,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) { ...@@ -169,7 +169,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SWalHead * pHead = (SWalHead *)offset; SWalHead * pHead = (SWalHead *)offset;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD); (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL);
offset += pHead->len + sizeof(SWalHead); offset += pHead->len + sizeof(SWalHead);
return offset; return offset;
......
...@@ -61,8 +61,6 @@ typedef struct { ...@@ -61,8 +61,6 @@ typedef struct {
char db[TSDB_DB_NAME_LEN]; char db[TSDB_DB_NAME_LEN];
} SVnodeObj; } SVnodeObj;
int vnodeWriteToQueue(void *param, void *pHead, int type);
int vnodeWriteCqMsgToQueue(void *param, void *pHead, int type);
void vnodeInitWriteFp(void); void vnodeInitWriteFp(void);
void vnodeInitReadFp(void); void vnodeInitReadFp(void);
......
...@@ -266,7 +266,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -266,7 +266,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.pass, tsInternalPass);
strcpy(cqCfg.db, pVnode->db); strcpy(cqCfg.db, pVnode->db);
cqCfg.vgId = vnode; cqCfg.vgId = vnode;
cqCfg.cqWrite = vnodeWriteCqMsgToQueue; cqCfg.cqWrite = vnodeWriteToQueue;
pVnode->cq = cqOpen(pVnode, &cqCfg); pVnode->cq = cqOpen(pVnode, &cqCfg);
if (pVnode->cq == NULL) { if (pVnode->cq == NULL) {
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
...@@ -365,6 +365,7 @@ int32_t vnodeClose(int32_t vgId) { ...@@ -365,6 +365,7 @@ int32_t vnodeClose(int32_t vgId) {
} }
void vnodeRelease(void *pVnodeRaw) { void vnodeRelease(void *pVnodeRaw) {
if (pVnodeRaw == NULL) return;
SVnodeObj *pVnode = pVnodeRaw; SVnodeObj *pVnode = pVnodeRaw;
int32_t vgId = pVnode->vgId; int32_t vgId = pVnode->vgId;
...@@ -482,21 +483,6 @@ void *vnodeAcquireRqueue(int32_t vgId) { ...@@ -482,21 +483,6 @@ void *vnodeAcquireRqueue(int32_t vgId) {
return pVnode->rqueue; return pVnode->rqueue;
} }
void *vnodeAcquireWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return NULL;
int32_t code = vnodeCheckWrite(pVnode);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
vInfo("vgId:%d, can not provide write service, status is %s", vgId, vnodeStatus[pVnode->status]);
vnodeRelease(pVnode);
return NULL;
}
return pVnode->wqueue;
}
void *vnodeGetWal(void *pVnode) { void *vnodeGetWal(void *pVnode) {
return ((SVnodeObj *)pVnode)->wal; return ((SVnodeObj *)pVnode)->wal;
} }
......
...@@ -46,10 +46,10 @@ void vnodeInitWriteFp(void) { ...@@ -46,10 +46,10 @@ void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg;
} }
int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) {
int32_t code = 0; int32_t code = 0;
SVnodeObj *pVnode = (SVnodeObj *)param1; SVnodeObj *pVnode = param;
SWalHead * pHead = param2; SWalHead * pHead = pWrite->pHead;
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) { if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]); vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]);
...@@ -80,7 +80,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -80,7 +80,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0; int32_t syncCode = 0;
syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype); syncCode = syncForwardToPeer(pVnode->sync, pHead, &pWrite->rspRet, qtype);
if (syncCode < 0) return syncCode; if (syncCode < 0) return syncCode;
// write into WAL // write into WAL
...@@ -90,7 +90,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -90,7 +90,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
pVnode->version = pHead->version; pVnode->version = pHead->version;
// write data locally // write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, &pWrite->rspRet);
if (code < 0) return code; if (code < 0) return code;
return syncCode; return syncCode;
...@@ -204,35 +204,32 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR ...@@ -204,35 +204,32 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int vnodeWriteCqMsgToQueue(void *param, void *data, int type) { int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg) {
SVnodeObj *pVnode = param; SVnodeObj *pVnode = vparam;
SWalHead * pHead = data; SWalHead * pHead = wparam;
int size = sizeof(SWalHead) + pHead->len;
SSyncHead *pSync = (SSyncHead*) taosAllocateQitem(size + sizeof(SSyncHead));
SWalHead *pWal = (SWalHead *)(pSync + 1);
memcpy(pWal, pHead, size);
atomic_add_fetch_32(&pVnode->refCount, 1); if (qtype == TAOS_QTYPE_RPC) {
vTrace("CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); int32_t code = vnodeCheckWrite(pVnode);
if (code != TSDB_CODE_SUCCESS) return code;
taosWriteQitem(pVnode->wqueue, type, pSync); }
return 0; int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;
} SVWriteMsg *pWrite = taosAllocateQitem(size);
if (pWrite == NULL) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
int vnodeWriteToQueue(void *param, void *data, int type) { if (pMsg != NULL) {
SVnodeObj *pVnode = param; SRpcMsg *pRpcMsg = pMsg;
SWalHead * pHead = data; pWrite->rpcHandle = pRpcMsg->handle;
pWrite->rpcAhandle = pRpcMsg->ahandle;
}
int size = sizeof(SWalHead) + pHead->len; memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
memcpy(pWal, pHead, size);
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
taosWriteQitem(pVnode->wqueue, type, pWal); taosWriteQitem(pVnode->wqueue, qtype, pWrite);
return TSDB_CODE_SUCCESS;
return 0;
} }
...@@ -122,7 +122,7 @@ void walFsync(void *handle) { ...@@ -122,7 +122,7 @@ void walFsync(void *handle) {
} }
} }
int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *, int32_t)) { int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
if (handle == NULL) return -1; if (handle == NULL) return -1;
SWal * pWal = handle; SWal * pWal = handle;
...@@ -307,7 +307,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -307,7 +307,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
if (pWal->keep) pWal->version = pHead->version; if (pWal->keep) pWal->version = pHead->version;
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL); (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
} }
tclose(fd); tclose(fd);
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
int64_t ver = 0; int64_t ver = 0;
void *pWal = NULL; void *pWal = NULL;
int writeToQueue(void *pVnode, void *data, int type) { int writeToQueue(void *pVnode, void *data, int type, void *pMsg) {
// do nothing // do nothing
SWalHead *pHead = data; SWalHead *pHead = data;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册