vnodeWrite.c 12.2 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
S
Shengliang Guan 已提交
20
#include "tglobal.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
21
#include "tqueue.h"
S
Shengliang Guan 已提交
22 23
#include "ttimer.h"
#include "dnode.h"
24
#include "vnodeStatus.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
25

S
TD-2321  
Shengliang Guan 已提交
26
#define MAX_QUEUED_MSG_NUM 10000
S
TD-2014  
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
extern void *  tsDnodeTmr;
S
TD-2014  
Shengliang Guan 已提交
29 30 31 32 33 34 35
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
S
Shengliang Guan 已提交
36
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
37

38
int32_t vnodeInitWrite(void) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
39 40 41 42 43
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT]          = vnodeProcessSubmitMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE]   = vnodeProcessDropTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE]  = vnodeProcessAlterTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE]  = vnodeProcessDropStableMsg;
H
TD-90  
Hongze Cheng 已提交
44
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL]  = vnodeProcessUpdateTagValMsg;
45 46

  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47 48
}

49 50
void vnodeCleanupWrite() {}

S
TD-1842  
Shengliang Guan 已提交
51
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) {
S
TD-2166  
Shengliang Guan 已提交
52 53 54
  int32_t    code = 0;
  SVnodeObj *pVnode = vparam;
  SWalHead * pHead = wparam;
S
TD-2677  
Shengliang Guan 已提交
55
  SVWriteMsg*pWrite = rparam;
S
TD-2677  
Shengliang Guan 已提交
56 57 58

  SRspRet *pRspRet = NULL;
  if (pWrite != NULL) pRspRet = &pWrite->rspRet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
59

S
add log  
Shengliang Guan 已提交
60
  if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
S
Shengliang Guan 已提交
61 62
    vError("vgId:%d, msg:%s not processed since no handle, qtype:%s hver:%" PRIu64, pVnode->vgId,
           taosMsg[pHead->msgType], qtypeStr[qtype], pHead->version);
S
TD-1652  
Shengliang Guan 已提交
63
    return TSDB_CODE_VND_MSG_NOT_PROCESSED;
S
add log  
Shengliang Guan 已提交
64
  }
J
Jeff Tao 已提交
65

S
Shengliang Guan 已提交
66 67 68
  vTrace("vgId:%d, msg:%s will be processed in vnode, qtype:%s hver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId,
         taosMsg[pHead->msgType], qtypeStr[qtype], pHead->version, pVnode->version);

S
TD-1652  
Shengliang Guan 已提交
69
  if (pHead->version == 0) {  // from client or CQ
70
    if (!vnodeInReadyStatus(pVnode)) {
S
Shengliang Guan 已提交
71 72
      vDebug("vgId:%d, msg:%s not processed since vstatus:%d, qtype:%s hver:%" PRIu64, pVnode->vgId,
             taosMsg[pHead->msgType], pVnode->status, qtypeStr[qtype], pHead->version);
S
Shengliang Guan 已提交
73
      return TSDB_CODE_APP_NOT_READY;  // it may be in deleting or closing state
S
add log  
Shengliang Guan 已提交
74
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
75

76
    if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
S
Shengliang Guan 已提交
77 78
      vDebug("vgId:%d, msg:%s not processed since replica:%d role:%s, qtype:%s hver:%" PRIu64, pVnode->vgId,
             taosMsg[pHead->msgType], pVnode->syncCfg.replica, syncRole[pVnode->role], qtypeStr[qtype], pHead->version);
S
Shengliang Guan 已提交
79
      return TSDB_CODE_APP_NOT_READY;
S
add log  
Shengliang Guan 已提交
80
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
81 82

    // assign version
S
TD-1652  
Shengliang Guan 已提交
83
    pHead->version = pVnode->version + 1;
S
TD-1652  
Shengliang Guan 已提交
84
  } else {  // from wal or forward
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
85 86 87 88
    // for data from WAL or forward, version may be smaller
    if (pHead->version <= pVnode->version) return 0;
  }

S
TD-1652  
Shengliang Guan 已提交
89 90
  // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
  int32_t syncCode = 0;
S
TD-2677  
Shengliang Guan 已提交
91
  syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype);
S
TD-1652  
Shengliang Guan 已提交
92
  if (syncCode < 0) return syncCode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
93 94 95

  // write into WAL
  code = walWrite(pVnode->wal, pHead);
S
TD-2640  
Shengliang Guan 已提交
96 97 98 99
  if (code < 0) {
    vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code);
    return code;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100

S
TD-1652  
Shengliang Guan 已提交
101
  pVnode->version = pHead->version;
102

S
TD-1652  
Shengliang Guan 已提交
103
  // write data locally
S
TD-1842  
Shengliang Guan 已提交
104
  code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet);
105
  if (code < 0) return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
106

107
  return syncCode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108 109
}

S
TD-2605  
Shengliang Guan 已提交
110
static int32_t vnodeCheckWrite(SVnodeObj *pVnode) {
S
TD-1768  
Shengliang Guan 已提交
111
  if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
S
TD-1982  
Shengliang Guan 已提交
112
    vDebug("vgId:%d, no write auth, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
S
TD-1768  
Shengliang Guan 已提交
113 114 115
    return TSDB_CODE_VND_NO_WRITE_AUTH;
  }

S
TD-2429  
Shengliang Guan 已提交
116 117 118 119 120 121 122
  if (pVnode->dbReplica != pVnode->syncCfg.replica &&
      pVnode->syncCfg.nodeInfo[pVnode->syncCfg.replica - 1].nodeId == dnodeGetDnodeId()) {
    vDebug("vgId:%d, vnode is balancing and will be dropped, dbReplica:%d vgReplica:%d, refCount:%d pVnode:%p",
           pVnode->vgId, pVnode->dbReplica, pVnode->syncCfg.replica, pVnode->refCount, pVnode);
    return TSDB_CODE_VND_IS_BALANCING;
  }

S
TD-1768  
Shengliang Guan 已提交
123 124
  // tsdb may be in reset state
  if (pVnode->tsdb == NULL) {
S
TD-1982  
Shengliang Guan 已提交
125
    vDebug("vgId:%d, tsdb is null, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
S
TD-1768  
Shengliang Guan 已提交
126 127 128
    return TSDB_CODE_APP_NOT_READY;
  }

S
TD-2087  
Shengliang Guan 已提交
129 130 131 132 133
  if (pVnode->isFull) {
    vDebug("vgId:%d, vnode is full, refCount:%d", pVnode->vgId, pVnode->refCount);
    return TSDB_CODE_VND_IS_FULL;
  }

S
TD-1768  
Shengliang Guan 已提交
134 135 136
  return TSDB_CODE_SUCCESS;
}

J
Jeff Tao 已提交
137
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
TD-353  
Hongze Cheng 已提交
138
  int32_t code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
139

140 141
  vTrace("vgId:%d, submit msg is processed", pVnode->vgId);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
142
  // save insert result into item
143
  SShellSubmitRspMsg *pRsp = NULL;
S
TD-1652  
Shengliang Guan 已提交
144
  if (pRet) {
145 146 147 148
    pRet->len = sizeof(SShellSubmitRspMsg);
    pRet->rsp = rpcMallocCont(pRet->len);
    pRsp = pRet->rsp;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
149

H
TD-353  
Hongze Cheng 已提交
150
  if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno;
S
TD-1652  
Shengliang Guan 已提交
151

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152 153 154
  return code;
}

J
Jeff Tao 已提交
155
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
TD-353  
Hongze Cheng 已提交
156
  int code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
157 158

  STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
S
TD-1415  
Shengliang Guan 已提交
159 160 161 162 163 164 165 166 167
  if (pCfg == NULL) {
    ASSERT(terrno != 0);
    return terrno;
  }

  if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) {
    code = terrno;
    ASSERT(code != 0);
  }
H
Hongze Cheng 已提交
168 169 170

  tsdbClearTableCfg(pCfg);
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
171 172
}

J
Jeff Tao 已提交
173
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
174
  SMDDropTableMsg *pTable = pCont;
H
TD-353  
Hongze Cheng 已提交
175
  int32_t          code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
176

177
  vDebug("vgId:%d, table:%s, start to drop", pVnode->vgId, pTable->tableId);
H
Haojun Liao 已提交
178
  STableId tableId = {.uid = htobe64(pTable->uid), .tid = htonl(pTable->tid)};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
179

H
TD-353  
Hongze Cheng 已提交
180
  if (tsdbDropTable(pVnode->tsdb, tableId) < 0) code = terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
181 182 183 184

  return code;
}

J
Jeff Tao 已提交
185
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
186 187 188 189 190 191
  // TODO: disposed in tsdb
  // STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
  // if (pCfg == NULL) return terrno;
  // if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) code = terrno;

  // tsdbClearTableCfg(pCfg);
192
  vDebug("vgId:%d, alter table msg is received", pVnode->vgId);
H
Hongze Cheng 已提交
193
  return TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
194 195
}

J
Jeff Tao 已提交
196
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
S
TD-1732  
Shengliang Guan 已提交
197
  SDropSTableMsg *pTable = pCont;
S
TD-1915  
Shengliang Guan 已提交
198
  int32_t         code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
199

200
  vDebug("vgId:%d, stable:%s, start to drop", pVnode->vgId, pTable->tableId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201

H
TD-353  
Hongze Cheng 已提交
202 203 204 205
  STableId stableId = {.uid = htobe64(pTable->uid), .tid = -1};

  if (tsdbDropTable(pVnode->tsdb, stableId) < 0) code = terrno;

206
  vDebug("vgId:%d, stable:%s, drop stable result:%s", pVnode->vgId, pTable->tableId, tstrerror(code));
H
TD-353  
Hongze Cheng 已提交
207

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
208 209 210
  return code;
}

H
TD-90  
Hongze Cheng 已提交
211
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
Hongze Cheng 已提交
212
  if (tsdbUpdateTableTagValue(pVnode->tsdb, (SUpdateTableTagValMsg *)pCont) < 0) {
H
TD-353  
Hongze Cheng 已提交
213 214 215
    return terrno;
  }
  return TSDB_CODE_SUCCESS;
H
TD-90  
Hongze Cheng 已提交
216 217
}

S
TD-2605  
Shengliang Guan 已提交
218
static SVWriteMsg *vnodeBuildVWriteMsg(SVnodeObj *pVnode, SWalHead *pHead, int32_t qtype, SRpcMsg *pRpcMsg) {
S
TD-2041  
Shengliang Guan 已提交
219 220
  if (pHead->len > TSDB_MAX_WAL_SIZE) {
    vError("vgId:%d, wal len:%d exceeds limit, hver:%" PRIu64, pVnode->vgId, pHead->len, pHead->version);
S
TD-2605  
Shengliang Guan 已提交
221 222
    terrno = TSDB_CODE_WAL_SIZE_LIMIT;
    return NULL;
S
TD-2041  
Shengliang Guan 已提交
223 224
  }

S
TD-1918  
Shengliang Guan 已提交
225 226 227
  int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;
  SVWriteMsg *pWrite = taosAllocateQitem(size);
  if (pWrite == NULL) {
S
TD-2605  
Shengliang Guan 已提交
228 229
    terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
    return NULL;
S
TD-1918  
Shengliang Guan 已提交
230
  }
B
Bomin Zhang 已提交
231

S
TD-2605  
Shengliang Guan 已提交
232
  if (pRpcMsg != NULL) {
S
Shengliang Guan 已提交
233
    pWrite->rpcMsg = *pRpcMsg;
S
TD-1918  
Shengliang Guan 已提交
234
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
235

S
Shengliang Guan 已提交
236
  memcpy(&pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
S
Shengliang Guan 已提交
237 238
  pWrite->pVnode = pVnode;
  pWrite->qtype = qtype;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239

J
Jeff Tao 已提交
240
  atomic_add_fetch_32(&pVnode->refCount, 1);
S
TD-2014  
Shengliang Guan 已提交
241

S
TD-2605  
Shengliang Guan 已提交
242 243 244 245 246 247 248 249 250
  return pWrite;
}

static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
  SVnodeObj *pVnode = pWrite->pVnode;

  if (pWrite->qtype == TAOS_QTYPE_RPC) {
    int32_t code = vnodeCheckWrite(pVnode);
    if (code != TSDB_CODE_SUCCESS) {
S
TD-2640  
Shengliang Guan 已提交
251
      vError("vgId:%d, failed to write into vwqueue since %s", pVnode->vgId, tstrerror(code));
S
TD-2605  
Shengliang Guan 已提交
252 253 254 255 256 257
      taosFreeQitem(pWrite);
      vnodeRelease(pVnode);
      return code;
    }
  }

S
TD-2640  
Shengliang Guan 已提交
258 259 260
  if (!vnodeInReadyOrUpdatingStatus(pVnode)) {
    vError("vgId:%d, failed to write into vwqueue, vstatus is %s, refCount:%d pVnode:%p", pVnode->vgId,
           vnodeStatus[pVnode->status], pVnode->refCount, pVnode);
S
TD-2640  
Shengliang Guan 已提交
261
    taosFreeQitem(pWrite);
S
TD-2640  
Shengliang Guan 已提交
262
    vnodeRelease(pVnode);
S
TD-2640  
Shengliang Guan 已提交
263 264 265
    return TSDB_CODE_APP_NOT_READY;
  }

S
TD-1899  
Shengliang Guan 已提交
266
  int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
S
TD-2014  
Shengliang Guan 已提交
267
  if (queued > MAX_QUEUED_MSG_NUM) {
S
TD-2416  
Shengliang Guan 已提交
268 269 270 271
    int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
    if (ms > 100) ms = 100;
    vDebug("vgId:%d, too many msg:%d in vwqueue, flow control %dms", pVnode->vgId, queued, ms);
    taosMsleep(ms);
S
TD-2014  
Shengliang Guan 已提交
272 273
  }

S
TD-1899  
Shengliang Guan 已提交
274
  vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
S
Shengliang Guan 已提交
275

S
TD-2605  
Shengliang Guan 已提交
276
  taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
S
TD-1918  
Shengliang Guan 已提交
277
  return TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
278
}
S
TD-2014  
Shengliang Guan 已提交
279

S
TD-2605  
Shengliang Guan 已提交
280 281 282 283 284 285 286 287 288 289 290 291 292
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
  SVWriteMsg *pWrite = vnodeBuildVWriteMsg(vparam, wparam, qtype, rparam);
  if (pWrite == NULL) {
    assert(terrno != 0);
    return terrno;
  }

  int32_t code = vnodePerformFlowCtrl(pWrite);
  if (code != 0) return 0;

  return vnodeWriteToWQueueImp(pWrite);
}

S
TD-2014  
Shengliang Guan 已提交
293 294 295
void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
  SVnodeObj *pVnode = vparam;

S
Shengliang Guan 已提交
296 297
  int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
  vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, queued);
S
TD-2014  
Shengliang Guan 已提交
298 299 300 301

  taosFreeQitem(pWrite);
  vnodeRelease(pVnode);
}
S
Shengliang Guan 已提交
302

S
Shengliang Guan 已提交
303
static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
S
Shengliang Guan 已提交
304 305
  SVWriteMsg *pWrite = param;
  SVnodeObj * pVnode = pWrite->pVnode;
S
TD-2429  
Shengliang Guan 已提交
306
  int32_t     code = TSDB_CODE_VND_IS_SYNCING;
S
Shengliang Guan 已提交
307

S
TD-2321  
Shengliang Guan 已提交
308 309
  if (pVnode->flowctrlLevel <= 0) code = TSDB_CODE_VND_IS_FLOWCTRL;

S
Shengliang Guan 已提交
310 311
  pWrite->processedCount++;
  if (pWrite->processedCount > 100) {
S
Shengliang Guan 已提交
312 313
    vError("vgId:%d, msg:%p, failed to process since %s, retry:%d", pVnode->vgId, pWrite, tstrerror(code),
           pWrite->processedCount);
S
Shengliang Guan 已提交
314
    pWrite->processedCount = 1;
S
Shengliang Guan 已提交
315
    dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code);
S
Shengliang Guan 已提交
316 317 318
  } else {
    code = vnodePerformFlowCtrl(pWrite);
    if (code == 0) {
S
Shengliang Guan 已提交
319 320
      vDebug("vgId:%d, msg:%p, write into vwqueue after flowctrl, retry:%d", pVnode->vgId, pWrite,
             pWrite->processedCount);
S
Shengliang Guan 已提交
321
      pWrite->processedCount = 0;
S
TD-2605  
Shengliang Guan 已提交
322 323 324 325
      code = vnodeWriteToWQueueImp(pWrite);
      if (code != 0) {
        dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code);
      }
S
Shengliang Guan 已提交
326
    }
S
Shengliang Guan 已提交
327 328 329 330 331
  }
}

static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
  SVnodeObj *pVnode = pWrite->pVnode;
S
TD-2415  
Shengliang Guan 已提交
332 333
  if (pWrite->qtype != TAOS_QTYPE_RPC) return 0;
  if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->flowctrlLevel <= 0) return 0;
S
Shengliang Guan 已提交
334

S
TD-2498  
Shengliang Guan 已提交
335
  if (tsEnableFlowCtrl == 0) {
S
Shengliang Guan 已提交
336 337 338
    int32_t ms = pow(2, pVnode->flowctrlLevel + 2);
    if (ms > 100) ms = 100;
    vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl for %d ms", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, ms);
S
Shengliang Guan 已提交
339 340 341 342
    taosMsleep(ms);
    return 0;
  } else {
    void *unUsed = NULL;
S
Shengliang Guan 已提交
343 344
    taosTmrReset(vnodeFlowCtrlMsgToWQueue, 100, pWrite, tsDnodeTmr, &unUsed);

S
Shengliang Guan 已提交
345
    vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl, retry:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle,
S
Shengliang Guan 已提交
346 347
           pWrite->processedCount);
    return TSDB_CODE_VND_ACTION_IN_PROGRESS;
S
Shengliang Guan 已提交
348
  }
S
TD-2640  
Shengliang Guan 已提交
349 350
}

S
Shengliang Guan 已提交
351 352 353 354 355 356
void vnodeWaitWriteCompleted(SVnodeObj *pVnode) {
  while (pVnode->queuedWMsg > 0) {
    vTrace("vgId:%d, queued wmsg num:%d", pVnode->vgId, pVnode->queuedWMsg);
    taosMsleep(10);
  }
}