提交 d1cd6206 编写于 作者: S Shengliang Guan

TD-1455 TD-2196

上级 97434ac0
......@@ -176,7 +176,7 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
if (count <= 1) return;
SRpcMsg rpcRsp = {
.handle = pWrite->rpcHandle,
.handle = pWrite->rpcMsg.handle,
.pCont = pWrite->rspRet.rsp,
.contLen = pWrite->rspRet.len,
.code = pWrite->code,
......@@ -206,7 +206,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite,
pWrite->rpcAhandle, taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version);
pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version);
pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet);
if (pWrite->code <= 0) pWrite->processedCount = 1;
......
......@@ -23,7 +23,7 @@ extern "C" {
#define TAOS_SYNC_MAX_REPLICA 5
#define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF
typedef enum _TAOS_SYNC_ROLE {
typedef enum {
TAOS_SYNC_ROLE_OFFLINE = 0,
TAOS_SYNC_ROLE_UNSYNCED = 1,
TAOS_SYNC_ROLE_SYNCING = 2,
......@@ -31,7 +31,7 @@ typedef enum _TAOS_SYNC_ROLE {
TAOS_SYNC_ROLE_MASTER = 4
} ESyncRole;
typedef enum _TAOS_SYNC_STATUS {
typedef enum {
TAOS_SYNC_STATUS_INIT = 0,
TAOS_SYNC_STATUS_START = 1,
TAOS_SYNC_STATUS_FILE = 2,
......@@ -80,7 +80,7 @@ typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code);
typedef void (*FNotifyRole)(int32_t vgId, int8_t role);
// if a number of retrieving data failed, call this to start flow control
typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t mseconds);
typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level);
// when data file is synced successfully, notity app
typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion);
......
......@@ -20,6 +20,7 @@
extern "C" {
#endif
#include "trpc.h"
#include "twal.h"
typedef enum _VN_STATUS {
......@@ -51,8 +52,9 @@ typedef struct {
typedef struct {
int32_t code;
int32_t processedCount;
void * rpcHandle;
void * rpcAhandle;
int32_t qtype;
void * pVnode;
SRpcMsg rpcMsg;
SRspRet rspRet;
char reserveForSync[16];
SWalHead pHead[];
......
......@@ -504,6 +504,8 @@ void *syncRetrieveData(void *param) {
SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE();
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves);
pPeer->fileChanged = 0;
pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (pPeer->syncFd < 0) {
......@@ -520,10 +522,7 @@ void *syncRetrieveData(void *param) {
}
if (pPeer->fileChanged) {
// if file is changed 3 times continuously, start flow control
pPeer->numOfRetrieves++;
if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl)
(*pNode->notifyFlowCtrl)(pNode->vgId, 4 << (pPeer->numOfRetrieves - 2));
} else {
pPeer->numOfRetrieves = 0;
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0);
......
......@@ -39,7 +39,7 @@ typedef struct {
int32_t refCount; // reference count
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t delayMs;
int32_t flowctlLevel;
int8_t status;
int8_t role;
int8_t accessState;
......
......@@ -34,7 +34,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId);
static void vnodeNotifyRole(int32_t vgId, int8_t role);
static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds);
static void vnodeCtrlFlow(int32_t vgId, int32_t level);
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
......@@ -677,17 +677,15 @@ static void vnodeNotifyRole(int32_t vgId, int8_t role) {
vnodeRelease(pVnode);
}
static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds) {
static void vnodeCtrlFlow(int32_t vgId, int32_t level) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while ctrl flow", vgId);
return;
}
if (pVnode->delayMs != mseconds) {
pVnode->delayMs = mseconds;
vDebug("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds);
}
pVnode->flowctlLevel = level;
vDebug("vgId:%d, set flowctl level:%d", pVnode->vgId, level);
vnodeRelease(pVnode);
}
......
......@@ -22,14 +22,17 @@
#include "tsdb.h"
#include "twal.h"
#include "tsync.h"
#include "ttimer.h"
#include "tdataformat.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "syncInt.h"
#include "tcq.h"
#include "dnode.h"
#define MAX_QUEUED_MSG_NUM 10000
extern void * tsDnodeTmr;
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
......@@ -47,6 +50,32 @@ void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg;
}
static void vnodeFlowCtlMsgToWQueue(void *param, void *tmrId) {
SVWriteMsg *pWrite = param;
SVnodeObj * pVnode = pWrite->pVnode;
int32_t code = vnodeWriteToWQueue(pVnode, pWrite->pHead, pWrite->qtype, &pWrite->rpcMsg);
if (code != 0 && pWrite->qtype == TAOS_QTYPE_RPC) {
vDebug("vgId:%d, failed to reprocess msg after perform flowctl since %s", pVnode->vgId, tstrerror(code));
dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code);
}
tfree(pWrite);
vnodeRelease(pWrite->pVnode);
}
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
SVnodeObj *pVnode = pWrite->pVnode;
if (pVnode->flowctlLevel <= 0) return 0;
int32_t ms = pVnode->flowctlLevel * 5;
void * unUsed = NULL;
taosTmrReset(vnodeFlowCtlMsgToWQueue, ms, pWrite, tsDnodeTmr, &unUsed);
vDebug("vgId:%d, perform flowctl for %d ms", pVnode->vgId, ms);
return TSDB_CODE_RPC_ACTION_IN_PROGRESS;
}
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) {
int32_t code = 0;
SVnodeObj *pVnode = vparam;
......@@ -77,8 +106,6 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
// assign version
pHead->version = pVnode->version + 1;
if (pVnode->delayMs) taosMsleep(pVnode->delayMs);
} else { // from wal or forward
// for data from WAL or forward, version may be smaller
if (pHead->version <= pVnode->version) return 0;
......@@ -218,9 +245,10 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
SVnodeObj *pVnode = vparam;
SWalHead * pHead = wparam;
int32_t code = 0;
if (qtype == TAOS_QTYPE_RPC) {
int32_t code = vnodeCheckWrite(pVnode);
code = vnodeCheckWrite(pVnode);
if (code != TSDB_CODE_SUCCESS) return code;
}
......@@ -237,14 +265,18 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
if (rparam != NULL) {
SRpcMsg *pRpcMsg = rparam;
pWrite->rpcHandle = pRpcMsg->handle;
pWrite->rpcAhandle = pRpcMsg->ahandle;
pWrite->rpcMsg = *pRpcMsg;
}
memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
pWrite->pVnode = pVnode;
pWrite->qtype = qtype;
atomic_add_fetch_32(&pVnode->refCount, 1);
code = vnodePerformFlowCtrl(pWrite);
if (code != 0) return code;
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
if (queued > MAX_QUEUED_MSG_NUM) {
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control", pVnode->vgId, queued);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册