提交 a0880e0d 编写于 作者: S Shengliang Guan

TD-1918

上级 03d7e80d
......@@ -334,7 +334,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
pHead->version = 0;
// write into vnode write queue
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ);
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ, NULL);
free(buffer);
}
......@@ -24,7 +24,7 @@
int64_t ver = 0;
void *pCq = NULL;
int writeToQueue(void *pVnode, void *data, int type) {
int writeToQueue(void *pVnode, void *data, int type, void *pMsg) {
return 0;
}
......
......@@ -31,14 +31,6 @@ typedef struct {
pthread_t thread; // thread
} SWriteWorker;
typedef struct {
SRspRet rspRet;
SRpcMsg rpcMsg;
int32_t processedCount;
int32_t code;
int32_t contLen;
void * pCont;
} SWriteMsg;
typedef struct {
int32_t max; // max number of workers
......@@ -86,39 +78,38 @@ void dnodeCleanupVWrite() {
dInfo("dnode vwrite is closed");
}
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg) {
char *pCont = pMsg->pCont;
void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) {
int32_t code;
char *pCont = pRpcMsg->pCont;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
if (pRpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
SMsgDesc *pDesc = (SMsgDesc *)pCont;
pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
pCont += sizeof(SMsgDesc);
}
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
SMsgHead *pMsg = (SMsgHead *)pCont;
pMsg->vgId = htonl(pMsg->vgId);
pMsg->contLen = htonl(pMsg->contLen);
taos_queue queue = vnodeAcquireWqueue(pHead->vgId);
if (queue) {
// put message into queue
SWriteMsg *pWrite = taosAllocateQitem(sizeof(SWriteMsg));
pWrite->rpcMsg = *pMsg;
pWrite->pCont = pCont;
pWrite->contLen = pHead->contLen;
taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
void *pVnode = vnodeAcquire(pMsg->vgId);
if (pVnode == NULL) {
code = TSDB_CODE_VND_INVALID_VGROUP_ID;
} else {
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.pCont = NULL,
.contLen = 0,
.code = TSDB_CODE_VND_INVALID_VGROUP_ID,
.msgType = 0
};
SWalHead *pHead = (SWalHead *)(pCont - sizeof(SWalHead));
pHead->msgType = pRpcMsg->msgType;
pHead->version = 0;
pHead->len = pMsg->contLen;
code = vnodeWriteToQueue(pVnode, pHead, TAOS_QTYPE_RPC, pRpcMsg);
}
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = code};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont);
}
vnodeRelease(pVnode);
rpcFreeCont(pRpcMsg->pCont);
}
void *dnodeAllocVWriteQueue(void *pVnode) {
......@@ -179,7 +170,7 @@ void dnodeFreeVWriteQueue(void *wqueue) {
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
if (param == NULL) return;
SWriteMsg *pWrite = param;
SVWriteMsg *pWrite = param;
if (code < 0) pWrite->code = code;
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
......@@ -187,26 +178,22 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
if (count <= 1) return;
SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle,
.handle = pWrite->rpcHandle,
.pCont = pWrite->rspRet.rsp,
.contLen = pWrite->rspRet.len,
.code = pWrite->code,
};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pWrite->rpcMsg.pCont);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
}
static void *dnodeProcessWriteQueue(void *param) {
SWriteWorker *pWorker = (SWriteWorker *)param;
SWriteMsg * pWrite;
SWalHead * pHead;
SRspRet * pRspRet;
SWriteWorker *pWorker = param;
SVWriteMsg * pWrite;
void * pVnode;
void * pItem;
int32_t numOfMsgs;
int32_t qtype;
......@@ -220,36 +207,14 @@ static void *dnodeProcessWriteQueue(void *param) {
}
for (int32_t i = 0; i < numOfMsgs; ++i) {
pWrite = NULL;
pRspRet = NULL;
taosGetQitem(pWorker->qall, &qtype, &pItem);
if (qtype == TAOS_QTYPE_RPC) {
pWrite = pItem;
pRspRet = &pWrite->rspRet;
pHead = (SWalHead *)((char *)pWrite->pCont - sizeof(SWalHead));
pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0;
pHead->len = pWrite->contLen;
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType]);
} else if (qtype == TAOS_QTYPE_CQ) {
pHead = (SWalHead *)((char *)pItem + sizeof(SSyncHead));
dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
pHead->version);
} else {
pHead = pItem;
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
pHead->version);
}
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%" PRIu64, pWrite->rpcAhandle, pWrite,
taosMsg[pWrite->pHead->msgType], qtype, pWrite->pHead->version);
int32_t code = vnodeProcessWrite(pVnode, qtype, pHead, pRspRet);
dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType],
pHead->version, tstrerror(code));
pWrite->code = vnodeProcessWrite(pVnode, qtype, pWrite);
if (pWrite->code <= 0) pWrite->processedCount = 1;
if (pWrite) {
pWrite->rpcMsg.code = code;
if (code <= 0) pWrite->processedCount = 1;
}
dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code));
}
walFsync(vnodeGetWal(pVnode));
......@@ -257,17 +222,15 @@ static void *dnodeProcessWriteQueue(void *param) {
// browse all items, and process them one by one
taosResetQitems(pWorker->qall);
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(pWorker->qall, &qtype, &pItem);
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
if (qtype == TAOS_QTYPE_RPC) {
pWrite = pItem;
dnodeSendRpcVWriteRsp(pVnode, pItem, pWrite->rpcMsg.code);
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
} else if (qtype == TAOS_QTYPE_FWD) {
pHead = pItem;
vnodeConfirmForward(pVnode, pHead->version, 0);
taosFreeQitem(pItem);
vnodeConfirmForward(pVnode, pWrite->pHead->version, 0);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
} else {
taosFreeQitem(pItem);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
}
}
......
......@@ -21,7 +21,7 @@ extern "C" {
#include "tdataformat.h"
typedef int (*FCqWrite)(void *ahandle, void *pHead, int type);
typedef int32_t (*FCqWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
typedef struct {
int vgId;
......
......@@ -71,7 +71,7 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin
typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId);
// when a forward pkt is received, call this to handle data
typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type);
typedef int32_t (*FWriteToCache)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
// when forward is confirmed by peer, master call this API to notify app
typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
......
......@@ -43,8 +43,8 @@ typedef struct {
int8_t keep; // keep the wal file when closed
} SWalCfg;
typedef void* twalh; // WAL HANDLE
typedef int (*FWalWrite)(void *ahandle, void *pHead, int type);
typedef void * twalh; // WAL HANDLE
typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
int32_t walInit();
void walCleanUp();
......
......@@ -20,6 +20,8 @@
extern "C" {
#endif
#include "twal.h"
typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT,
TAOS_VN_STATUS_READY,
......@@ -29,9 +31,9 @@ typedef enum _VN_STATUS {
} EVnStatus;
typedef struct {
int len;
void *rsp;
void *qhandle; //used by query and retrieve msg
int32_t len;
void * rsp;
void * qhandle; // used by query and retrieve msg
} SRspRet;
typedef struct {
......@@ -41,6 +43,16 @@ typedef struct {
SRpcMsg rpcMsg;
} SReadMsg;
typedef struct {
int32_t code;
int32_t processedCount;
void * rpcHandle;
void * rpcAhandle;
SRspRet rspRet;
char reserveForSync[16];
SWalHead pHead[];
} SVWriteMsg;
extern char *vnodeStatus[];
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
......@@ -55,7 +67,9 @@ void* vnodeAcquireWqueue(int32_t vgId); // add recCount, get write queue
void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg);
int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite);
int32_t vnodeCheckWrite(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param);
......
......@@ -863,7 +863,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD);
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL);
} else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead);
......
......@@ -154,7 +154,7 @@ static int syncRestoreWal(SSyncPeer *pPeer) {
if (ret < 0) break;
sDebug("%s, restore a record, ver:%" PRIu64, pPeer->id, pHead->version);
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL);
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL, NULL);
}
if (code < 0) {
......@@ -169,7 +169,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
SSyncNode *pNode = pPeer->pSyncNode;
SWalHead * pHead = (SWalHead *)offset;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD);
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL);
offset += pHead->len + sizeof(SWalHead);
return offset;
......
......@@ -61,8 +61,6 @@ typedef struct {
char db[TSDB_DB_NAME_LEN];
} SVnodeObj;
int vnodeWriteToQueue(void *param, void *pHead, int type);
int vnodeWriteCqMsgToQueue(void *param, void *pHead, int type);
void vnodeInitWriteFp(void);
void vnodeInitReadFp(void);
......
......@@ -266,7 +266,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
strcpy(cqCfg.pass, tsInternalPass);
strcpy(cqCfg.db, pVnode->db);
cqCfg.vgId = vnode;
cqCfg.cqWrite = vnodeWriteCqMsgToQueue;
cqCfg.cqWrite = vnodeWriteToQueue;
pVnode->cq = cqOpen(pVnode, &cqCfg);
if (pVnode->cq == NULL) {
vnodeCleanUp(pVnode);
......@@ -365,6 +365,7 @@ int32_t vnodeClose(int32_t vgId) {
}
void vnodeRelease(void *pVnodeRaw) {
if (pVnodeRaw == NULL) return;
SVnodeObj *pVnode = pVnodeRaw;
int32_t vgId = pVnode->vgId;
......
......@@ -46,10 +46,10 @@ void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg;
}
int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) {
int32_t code = 0;
SVnodeObj *pVnode = (SVnodeObj *)param1;
SWalHead * pHead = param2;
SVnodeObj *pVnode = param;
SWalHead * pHead = pWrite->pHead;
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]);
......@@ -80,7 +80,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0;
syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
syncCode = syncForwardToPeer(pVnode->sync, pHead, &pWrite->rspRet, qtype);
if (syncCode < 0) return syncCode;
// write into WAL
......@@ -90,7 +90,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
pVnode->version = pHead->version;
// write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, &pWrite->rspRet);
if (code < 0) return code;
return syncCode;
......@@ -204,35 +204,31 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
return TSDB_CODE_SUCCESS;
}
int vnodeWriteCqMsgToQueue(void *param, void *data, int type) {
SVnodeObj *pVnode = param;
SWalHead * pHead = data;
int size = sizeof(SWalHead) + pHead->len;
SSyncHead *pSync = (SSyncHead*) taosAllocateQitem(size + sizeof(SSyncHead));
SWalHead *pWal = (SWalHead *)(pSync + 1);
memcpy(pWal, pHead, size);
int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg) {
SVnodeObj *pVnode = vparam;
SWalHead * pHead = wparam;
atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
taosWriteQitem(pVnode->wqueue, type, pSync);
if (qtype == TAOS_QTYPE_RPC && vnodeCheckWrite(pVnode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
return 0;
}
int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;
SVWriteMsg *pWrite = taosAllocateQitem(size);
if (pWrite == NULL) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
int vnodeWriteToQueue(void *param, void *data, int type) {
SVnodeObj *pVnode = param;
SWalHead * pHead = data;
if (pMsg != NULL) {
SRpcMsg *pRpcMsg = pMsg;
pWrite->rpcHandle = pRpcMsg->handle;
pWrite->rpcAhandle = pRpcMsg->ahandle;
}
int size = sizeof(SWalHead) + pHead->len;
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
memcpy(pWal, pHead, size);
memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
taosWriteQitem(pVnode->wqueue, type, pWal);
return 0;
taosWriteQitem(pVnode->wqueue, qtype, pWrite);
return TSDB_CODE_SUCCESS;
}
......@@ -122,7 +122,7 @@ void walFsync(void *handle) {
}
}
int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *, int32_t)) {
int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
if (handle == NULL) return -1;
SWal * pWal = handle;
......@@ -307,7 +307,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
if (pWal->keep) pWal->version = pHead->version;
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
}
tclose(fd);
......
......@@ -23,7 +23,7 @@
int64_t ver = 0;
void *pWal = NULL;
int writeToQueue(void *pVnode, void *data, int type) {
int writeToQueue(void *pVnode, void *data, int type, void *pMsg) {
// do nothing
SWalHead *pHead = data;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册