vnodeSvr.c 43.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

18 19 20 21 22 23 24 25 26
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
27 28
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
29
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
S
Shengliang Guan 已提交
30
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
31
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
32
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
33

34
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
35
  int32_t  code = 0;
H
Hongze Cheng 已提交
36
  SDecoder dc = {0};
H
Hongze Cheng 已提交
37

H
Hongze Cheng 已提交
38 39 40 41 42 43
  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      int64_t ctime = taosGetTimestampMs();
      int32_t nReqs;

      tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
H
Hongze Cheng 已提交
44 45 46 47
      if (tStartDecode(&dc) < 0) {
        code = TSDB_CODE_INVALID_MSG;
        return code;
      }
H
Hongze Cheng 已提交
48

H
Hongze Cheng 已提交
49 50 51 52
      if (tDecodeI32v(&dc, &nReqs) < 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _err;
      }
H
Hongze Cheng 已提交
53 54
      for (int32_t iReq = 0; iReq < nReqs; iReq++) {
        tb_uid_t uid = tGenIdPI64();
H
Hongze Cheng 已提交
55
        char    *name = NULL;
H
Hongze Cheng 已提交
56 57 58 59
        if (tStartDecode(&dc) < 0) {
          code = TSDB_CODE_INVALID_MSG;
          goto _err;
        }
H
Hongze Cheng 已提交
60

H
Hongze Cheng 已提交
61 62 63 64 65 66 67 68
        if (tDecodeI32v(&dc, NULL) < 0) {
          code = TSDB_CODE_INVALID_MSG;
          return code;
        }
        if (tDecodeCStr(&dc, &name) < 0) {
          code = TSDB_CODE_INVALID_MSG;
          return code;
        }
H
Hongze Cheng 已提交
69 70 71
        *(int64_t *)(dc.data + dc.pos) = uid;
        *(int64_t *)(dc.data + dc.pos + 8) = ctime;

72
        vTrace("vgId:%d, table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid);
H
Hongze Cheng 已提交
73 74 75 76 77 78 79
        tEndDecode(&dc);
      }

      tEndDecode(&dc);
      tDecoderClear(&dc);
    } break;
    case TDMT_VND_SUBMIT: {
H
Hongze Cheng 已提交
80 81
      int64_t ctime = taosGetTimestampMs();

H
Hongze Cheng 已提交
82 83
      tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
      tStartDecode(&dc);
H
Hongze Cheng 已提交
84

H
Hongze Cheng 已提交
85 86 87 88 89
      uint64_t nSubmitTbData;
      if (tDecodeU64v(&dc, &nSubmitTbData) < 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _err;
      }
H
Hongze Cheng 已提交
90

H
Hongze Cheng 已提交
91 92 93 94 95
      for (int32_t i = 0; i < nSubmitTbData; i++) {
        if (tStartDecode(&dc) < 0) {
          code = TSDB_CODE_INVALID_MSG;
          goto _err;
        }
H
Hongze Cheng 已提交
96

H
Hongze Cheng 已提交
97 98 99 100 101 102 103 104 105 106 107
        int32_t flags;
        if (tDecodeI32v(&dc, &flags) < 0) {
          code = TSDB_CODE_INVALID_MSG;
          goto _err;
        }

        if (flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
          if (tStartDecode(&dc) < 0) {
            code = TSDB_CODE_INVALID_MSG;
            goto _err;
          }
H
Hongze Cheng 已提交
108

H
Hongze Cheng 已提交
109 110 111 112
          if (tDecodeI32v(&dc, NULL) < 0) {
            code = TSDB_CODE_INVALID_MSG;
            goto _err;
          }
H
Hongze Cheng 已提交
113

H
Hongze Cheng 已提交
114 115 116 117 118 119 120
          char *name = NULL;
          if (tDecodeCStr(&dc, &name) < 0) {
            code = TSDB_CODE_INVALID_MSG;
            goto _err;
          }

          int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
H
Hongze Cheng 已提交
121 122 123
          if (uid == 0) {
            uid = tGenIdPI64();
          }
H
Hongze Cheng 已提交
124

H
Hongze Cheng 已提交
125
          *(int64_t *)(dc.data + dc.pos) = uid;
H
Hongze Cheng 已提交
126
          *(int64_t *)(dc.data + dc.pos + 8) = ctime;
H
Hongze Cheng 已提交
127

H
Hongze Cheng 已提交
128 129
          tEndDecode(&dc);
        }
H
Hongze Cheng 已提交
130 131

        tEndDecode(&dc);
H
Hongze Cheng 已提交
132 133
      }

H
Hongze Cheng 已提交
134
      tEndDecode(&dc);
H
Hongze Cheng 已提交
135
    } break;
H
Hongze Cheng 已提交
136 137 138 139 140 141 142 143 144 145
    case TDMT_VND_DELETE: {
      int32_t     size;
      int32_t     ret;
      uint8_t    *pCont;
      SEncoder   *pCoder = &(SEncoder){0};
      SDeleteRes  res = {0};
      SReadHandle handle = {
          .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};

      code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
wmmhello's avatar
wmmhello 已提交
146 147 148
      if (code) {
        goto _err;
      }
H
Hongze Cheng 已提交
149 150 151 152 153

      // malloc and encode
      tEncodeSize(tEncodeDeleteRes, &res, size, ret);
      pCont = rpcMallocCont(size + sizeof(SMsgHead));

154 155
      ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
      ((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
H
Hongze Cheng 已提交
156 157 158 159 160 161 162 163 164 165 166

      tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
      tEncodeDeleteRes(pCoder, &res);
      tEncoderClear(pCoder);

      rpcFreeCont(pMsg->pCont);
      pMsg->pCont = pCont;
      pMsg->contLen = size + sizeof(SMsgHead);

      taosArrayDestroy(res.uidList);
    } break;
H
Hongze Cheng 已提交
167 168 169
    default:
      break;
  }
H
Hongze Cheng 已提交
170

S
Shengliang Guan 已提交
171
  return code;
H
Hongze Cheng 已提交
172 173 174 175

_err:
  vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
176 177
}

178
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
179 180 181 182
  void   *ptr = NULL;
  void   *pReq;
  int32_t len;
  int32_t ret;
H
Hongze Cheng 已提交
183

184
  if (!pVnode->inUse) {
185
    terrno = TSDB_CODE_VND_NO_AVAIL_BUFPOOL;
H
Hongze Cheng 已提交
186 187
    vError("vgId:%d, not ready to write since %s", TD_VID(pVnode), terrstr());
    return -1;
188 189
  }

190 191 192
  if (version <= pVnode->state.applied) {
    vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version,
           pVnode->state.applied);
193
    terrno = TSDB_CODE_VND_DUP_REQUEST;
194 195 196 197
    pRsp->info.handle = NULL;
    return -1;
  }

198
  vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
199
         version);
H
Hongze Cheng 已提交
200

201
  ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
H
Hongze Cheng 已提交
202
  pVnode->state.applied = version;
H
Hongze Cheng 已提交
203
  pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
H
Hongze Cheng 已提交
204

H
Hongze Cheng 已提交
205 206 207
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
H
Hongze Cheng 已提交
208 209

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
210
    /* META */
H
Hongze Cheng 已提交
211
    case TDMT_VND_CREATE_STB:
H
Hongze Cheng 已提交
212
      if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
213
      break;
H
Hongze Cheng 已提交
214
    case TDMT_VND_ALTER_STB:
H
Hongze Cheng 已提交
215
      if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
216
      break;
H
Hongze Cheng 已提交
217
    case TDMT_VND_DROP_STB:
H
Hongze Cheng 已提交
218
      if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
219
      break;
H
Hongze Cheng 已提交
220
    case TDMT_VND_CREATE_TABLE:
H
Hongze Cheng 已提交
221
      if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
222 223
      break;
    case TDMT_VND_ALTER_TABLE:
H
Hongze Cheng 已提交
224
      if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
225
      break;
H
Hongze Cheng 已提交
226
    case TDMT_VND_DROP_TABLE:
H
Hongze Cheng 已提交
227
      if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
228
      break;
229
    case TDMT_VND_DROP_TTL_TABLE:
230
      if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
231
      break;
S
Shengliang Guan 已提交
232 233 234 235
    case TDMT_VND_TRIM:
      if (vnodeProcessTrimReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
      break;
    case TDMT_VND_CREATE_SMA:
236
      if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
237
      break;
H
Hongze Cheng 已提交
238
    /* TSDB */
H
Hongze Cheng 已提交
239
    case TDMT_VND_SUBMIT:
240
      if (vnodeProcessSubmitReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
241
      break;
D
dapan1121 已提交
242
    case TDMT_VND_DELETE:
H
Hongze Cheng 已提交
243
      if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
D
dapan1121 已提交
244
      break;
245 246 247
    case TDMT_VND_BATCH_DEL:
      if (vnodeProcessBatchDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
      break;
H
Hongze Cheng 已提交
248
    /* TQ */
L
Liu Jicong 已提交
249
    case TDMT_VND_TMQ_SUBSCRIBE:
250
      if (tqProcessSubscribeReq(pVnode->pTq, version, pReq, len) < 0) {
251
        goto _err;
L
Liu Jicong 已提交
252 253
      }
      break;
L
Liu Jicong 已提交
254 255
    case TDMT_VND_TMQ_DELETE_SUB:
      if (tqProcessDeleteSubReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
256 257 258
        goto _err;
      }
      break;
L
Liu Jicong 已提交
259
    case TDMT_VND_TMQ_COMMIT_OFFSET:
260
      if (tqProcessOffsetCommitReq(pVnode->pTq, version, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
261
        goto _err;
L
Liu Jicong 已提交
262 263
      }
      break;
L
Liu Jicong 已提交
264
    case TDMT_VND_TMQ_ADD_CHECKINFO:
265
      if (tqProcessAddCheckInfoReq(pVnode->pTq, version, pReq, len) < 0) {
266 267 268
        goto _err;
      }
      break;
L
Liu Jicong 已提交
269
    case TDMT_VND_TMQ_DEL_CHECKINFO:
270
      if (tqProcessDelCheckInfoReq(pVnode->pTq, version, pReq, len) < 0) {
L
Liu Jicong 已提交
271 272 273
        goto _err;
      }
      break;
274
    case TDMT_STREAM_TASK_DEPLOY: {
275
      if (tqProcessTaskDeployReq(pVnode->pTq, version, pReq, len) < 0) {
276
        goto _err;
H
Hongze Cheng 已提交
277 278
      }
    } break;
L
Liu Jicong 已提交
279
    case TDMT_STREAM_TASK_DROP: {
280
      if (tqProcessTaskDropReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
L
Liu Jicong 已提交
281 282 283
        goto _err;
      }
    } break;
L
Liu Jicong 已提交
284
    case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: {
L
Liu Jicong 已提交
285 286 287 288
      if (tqProcessTaskRecover2Req(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
        goto _err;
      }
    } break;
289 290 291 292 293
    case TDMT_STREAM_TASK_CHECK_RSP: {
      if (tqProcessStreamTaskCheckRsp(pVnode->pTq, version, pReq, len) < 0) {
        goto _err;
      }
    } break;
294 295 296
    case TDMT_VND_ALTER_CONFIRM:
      vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
      break;
S
Shengliang Guan 已提交
297
    case TDMT_VND_ALTER_HASHRANGE:
298
      vnodeProcessAlterHashRangeReq(pVnode, version, pReq, len, pRsp);
S
Shengliang Guan 已提交
299
      break;
300
    case TDMT_VND_ALTER_CONFIG:
301
      vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
S
Shengliang Guan 已提交
302
      break;
H
Hongze Cheng 已提交
303
    case TDMT_VND_COMMIT:
H
Hongze Cheng 已提交
304 305 306
      vnodeSyncCommit(pVnode);
      vnodeBegin(pVnode);
      goto _exit;
H
Hongze Cheng 已提交
307
    default:
L
Liu Jicong 已提交
308 309
      vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
      return -1;
H
Hongze Cheng 已提交
310 311
  }

S
Shengliang Guan 已提交
312 313
  vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code,
         version);
H
Hongze Cheng 已提交
314

315 316
  walApplyVer(pVnode->pWal, version);

317
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
S
Shengliang Guan 已提交
318
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
319 320 321
    return -1;
  }

H
Hongze Cheng 已提交
322
  // commit if need
H
Hongze Cheng 已提交
323
  if (vnodeShouldCommit(pVnode)) {
S
Shengliang Guan 已提交
324
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
H
Hongze Cheng 已提交
325
#if 0
H
Hongze Cheng 已提交
326 327
    vnodeSyncCommit(pVnode);
#else
H
Hongze Cheng 已提交
328
    vnodeAsyncCommit(pVnode);
H
Hongze Cheng 已提交
329
#endif
H
Hongze Cheng 已提交
330 331

    // start a new one
332
    if (vnodeBegin(pVnode) < 0) {
H
Hongze Cheng 已提交
333 334
      vError("vgId:%d, failed to begin vnode since %s.", TD_VID(pVnode), tstrerror(terrno));
      goto _err;
335
    }
H
Hongze Cheng 已提交
336 337
  }

H
Hongze Cheng 已提交
338
_exit:
H
Hongze Cheng 已提交
339
  return 0;
H
Hongze Cheng 已提交
340 341

_err:
S
Shengliang Guan 已提交
342
  vError("vgId:%d, process %s request failed since %s, version:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
343 344
         tstrerror(terrno), version);
  return -1;
H
Hongze Cheng 已提交
345 346
}

347
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan1121 已提交
348
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
D
dapan1121 已提交
349 350 351
    return 0;
  }

352
  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg, TDMT_SCH_QUERY == pMsg->msgType);
D
dapan1121 已提交
353 354 355
}

int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
356
  vTrace("message in vnode query queue is processing");
357 358
  // if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
  if ((pMsg->msgType == TDMT_SCH_QUERY) && !syncIsReadyForRead(pVnode->sync)) {
359
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
360 361 362
    return 0;
  }

363
  SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Hongze Cheng 已提交
364
  switch (pMsg->msgType) {
D
dapan1121 已提交
365
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
366
    case TDMT_SCH_MERGE_QUERY:
D
dapan1121 已提交
367
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
368
    case TDMT_SCH_QUERY_CONTINUE:
D
dapan1121 已提交
369
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
370 371
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
372
      return TSDB_CODE_APP_ERROR;
H
Hongze Cheng 已提交
373 374 375
  }
}

376
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
S
Shengliang Guan 已提交
377
  vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
L
Liu Jicong 已提交
378 379
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
       pMsg->msgType == TDMT_VND_BATCH_META) &&
380 381
      !syncIsReadyForRead(pVnode->sync)) {
    //      !vnodeIsLeader(pVnode)) {
382
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
383 384 385
    return 0;
  }

L
Liu Jicong 已提交
386
  if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) {
387
    vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
L
Liu Jicong 已提交
388 389 390
    return 0;
  }

H
Hongze Cheng 已提交
391
  switch (pMsg->msgType) {
D
dapan1121 已提交
392
    case TDMT_SCH_FETCH:
D
dapan1121 已提交
393
    case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
394
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
395
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
396
      return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
L
Liu Jicong 已提交
397 398
    // case TDMT_SCH_CANCEL_TASK:
    //   return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
399
    case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
400
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
401
    case TDMT_SCH_QUERY_HEARTBEAT:
D
dapan1121 已提交
402
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
403
    case TDMT_VND_TABLE_META:
D
dapan1121 已提交
404
      return vnodeGetTableMeta(pVnode, pMsg, true);
D
dapan1121 已提交
405
    case TDMT_VND_TABLE_CFG:
D
dapan1121 已提交
406 407 408
      return vnodeGetTableCfg(pVnode, pMsg, true);
    case TDMT_VND_BATCH_META:
      return vnodeGetBatchMeta(pVnode, pMsg);
L
Liu Jicong 已提交
409
    case TDMT_VND_TMQ_CONSUME:
L
Liu Jicong 已提交
410
      return tqProcessPollReq(pVnode->pTq, pMsg);
411 412
    case TDMT_STREAM_TASK_RUN:
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
413
#if 1
414
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
415
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
L
Liu Jicong 已提交
416
#endif
417 418
    case TDMT_STREAM_TASK_CHECK:
      return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
419
    case TDMT_STREAM_TASK_DISPATCH_RSP:
L
Liu Jicong 已提交
420
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
421 422
    case TDMT_STREAM_RETRIEVE:
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
423 424
    case TDMT_STREAM_RETRIEVE_RSP:
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
425
    case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE:
L
Liu Jicong 已提交
426 427 428 429 430
      return tqProcessTaskRecover1Req(pVnode->pTq, pMsg);
    case TDMT_STREAM_RECOVER_FINISH:
      return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_RECOVER_FINISH_RSP:
      return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
431 432
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
433
      return TSDB_CODE_APP_ERROR;
H
Hongze Cheng 已提交
434 435 436 437 438 439
  }
}

// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
  // TODO
C
Cary Xu 已提交
440
  // blockDebugShowDataBlocks(data, __func__);
441
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
442 443
}

D
dapan1121 已提交
444
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
445 446 447
  if (NULL == pMetaRsp) {
    return;
  }
L
Liu Jicong 已提交
448

D
dapan1121 已提交
449 450 451 452 453 454
  strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
  pMetaRsp->dbId = pVnode->config.dbId;
  pMetaRsp->vgId = TD_VID(pVnode);
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}

S
Shengliang Guan 已提交
455
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
456
  int32_t     code = 0;
S
Shengliang Guan 已提交
457 458
  SVTrimDbReq trimReq = {0};

H
Hongze Cheng 已提交
459 460 461 462
  // decode
  if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
S
Shengliang Guan 已提交
463 464
  }

C
Cary Xu 已提交
465 466
  vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);

H
Hongze Cheng 已提交
467 468 469 470
  // process
  code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
  if (code) goto _exit;

C
Cary Xu 已提交
471 472 473
  code = smaDoRetention(pVnode->pSma, trimReq.timestamp);
  if (code) goto _exit;

H
Hongze Cheng 已提交
474 475
_exit:
  return code;
S
Shengliang Guan 已提交
476 477
}

L
Liu Jicong 已提交
478
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
479 480 481
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
  if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
482 483 484 485 486 487
  SVDropTtlTableReq ttlReq = {0};
  if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto end;
  }

488
  vDebug("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp);
S
Shengliang Guan 已提交
489
  int32_t ret = metaTtlDropTable(pVnode->pMeta, ttlReq.timestamp, tbUids);
L
Liu Jicong 已提交
490
  if (ret != 0) {
491 492
    goto end;
  }
L
Liu Jicong 已提交
493
  if (taosArrayGetSize(tbUids) > 0) {
wmmhello's avatar
wmmhello 已提交
494 495
    tqUpdateTbUidList(pVnode->pTq, tbUids, false);
  }
496 497 498 499 500 501

end:
  taosArrayDestroy(tbUids);
  return ret;
}

502
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
503
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
504
  SDecoder       coder;
H
Hongze Cheng 已提交
505

H
Hongze Cheng 已提交
506 507 508 509 510 511
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
512
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
513 514

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
515 516
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
517 518
  }

H
Hongze Cheng 已提交
519
  if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
H
Hongze Cheng 已提交
520 521
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
522 523
  }

524
  if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) {
525 526 527
    pRsp->code = terrno;
    goto _err;
  }
C
Cary Xu 已提交
528

H
Hongze Cheng 已提交
529
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
530
  return 0;
H
Hongze Cheng 已提交
531 532

_err:
H
Hongze Cheng 已提交
533
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
534
  return -1;
H
Hongze Cheng 已提交
535 536
}

537
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
538
  SDecoder           decoder = {0};
539
  SEncoder           encoder = {0};
540
  int32_t            rcode = 0;
H
Hongze Cheng 已提交
541 542
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
543 544 545
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
546
  STbUidStore       *pStore = NULL;
547
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
548 549

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
550 551 552
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
553

H
Hongze Cheng 已提交
554
  // decode
H
Hongze Cheng 已提交
555 556
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
557 558 559 560
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
561

H
Hongze Cheng 已提交
562
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
563 564
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
565 566 567 568 569
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
570
  // loop to create table
571
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
572
    pCreateReq = req.pReqs + iReq;
D
dapan1121 已提交
573
    memset(&cRsp, 0, sizeof(cRsp));
H
Hongze Cheng 已提交
574

C
Cary Xu 已提交
575 576 577 578
    if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
      rcode = -1;
      goto _exit;
    }
L
Liu Jicong 已提交
579

wafwerar's avatar
wafwerar 已提交
580 581 582 583 584
    if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
      rcode = -1;
      goto _exit;
    }

H
Hongze Cheng 已提交
585 586 587 588 589 590 591 592 593
    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
594
    if (metaCreateTable(pVnode->pMeta, version, pCreateReq, &cRsp.pMeta) < 0) {
H
Hongze Cheng 已提交
595 596 597 598 599
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
600
    } else {
H
Hongze Cheng 已提交
601
      cRsp.code = TSDB_CODE_SUCCESS;
602
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
603
      taosArrayPush(tbUids, &pCreateReq->uid);
L
Liu Jicong 已提交
604
      vnodeUpdateMetaRsp(pVnode, cRsp.pMeta);
H
Hongze Cheng 已提交
605
    }
H
Hongze Cheng 已提交
606 607

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
608 609
  }

H
Haojun Liao 已提交
610
  vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
611
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
612
  if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
C
Cary Xu 已提交
613 614
    goto _exit;
  }
615
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
616

H
Hongze Cheng 已提交
617
  // prepare rsp
618
  int32_t ret = 0;
wafwerar's avatar
wafwerar 已提交
619
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
620 621 622 623 624 625
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
626 627
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
H
Hongze Cheng 已提交
628

H
Hongze Cheng 已提交
629
_exit:
wmmhello's avatar
wmmhello 已提交
630 631
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
632
    taosMemoryFree(pCreateReq->comment);
wmmhello's avatar
wmmhello 已提交
633 634
    taosArrayDestroy(pCreateReq->ctb.tagName);
  }
635
  taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
636
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
637 638
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
639
  return rcode;
H
Hongze Cheng 已提交
640 641
}

642
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
658
  }
H
Hongze Cheng 已提交
659 660 661 662 663 664 665 666 667

  if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
668 669 670
  return 0;
}

671
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
672
  SVDropStbReq req = {0};
673
  int32_t      rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
674
  SDecoder     decoder = {0};
675
  SArray      *tbUidList = NULL;
H
Hongze Cheng 已提交
676 677 678 679 680 681

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
682 683
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
684 685 686 687 688
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
689 690 691 692 693 694 695 696
  tbUidList = taosArrayInit(8, sizeof(int64_t));
  if (tbUidList == NULL) goto _exit;
  if (metaDropSTable(pVnode->pMeta, version, &req, tbUidList) < 0) {
    rcode = terrno;
    goto _exit;
  }

  if (tqUpdateTbUidList(pVnode->pTq, tbUidList, false) < 0) {
697 698 699
    rcode = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
700

701 702 703 704 705
  if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
706 707
  // return rsp
_exit:
708
  if (tbUidList) taosArrayDestroy(tbUidList);
H
Hongze Cheng 已提交
709
  pRsp->code = rcode;
H
Hongze Cheng 已提交
710
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
711 712 713
  return 0;
}

714
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
D
dapan1121 已提交
715 716 717
  SVAlterTbReq  vAlterTbReq = {0};
  SVAlterTbRsp  vAlterTbRsp = {0};
  SDecoder      dc = {0};
718 719
  int32_t       rcode = 0;
  int32_t       ret;
D
dapan1121 已提交
720 721
  SEncoder      ec = {0};
  STableMetaRsp vMetaRsp = {0};
H
Hongze Cheng 已提交
722 723 724 725 726 727 728 729 730 731

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
732
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
733
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
734 735
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
736 737 738
  }

  // process
D
dapan1121 已提交
739
  if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq, &vMetaRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
740
    vAlterTbRsp.code = terrno;
H
Hongze Cheng 已提交
741
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
742 743
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
744 745
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
746

D
dapan1121 已提交
747 748 749 750 751
  if (NULL != vMetaRsp.pSchemas) {
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
    vAlterTbRsp.pMeta = &vMetaRsp;
  }

H
Hongze Cheng 已提交
752 753 754 755 756 757
_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
M
Minglei Jin 已提交
758 759 760
  if (vMetaRsp.pSchemas) {
    taosMemoryFree(vMetaRsp.pSchemas);
  }
H
Hongze Cheng 已提交
761 762 763
  return 0;
}

764
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
765 766
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
767
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
768
  SEncoder         encoder = {0};
769
  int32_t          ret;
770
  SArray          *tbUids = NULL;
771
  STbUidStore     *pStore = NULL;
H
Hongze Cheng 已提交
772

H
Hongze Cheng 已提交
773
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
774 775 776
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
777 778

  // decode req
H
Hongze Cheng 已提交
779 780
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
781 782 783 784 785
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
786 787

  // process req
788
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
789
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
790 791
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

792
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
793 794
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
795
    tb_uid_t     tbUid = 0;
H
Hongze Cheng 已提交
796

H
Hongze Cheng 已提交
797
    /* code */
798
    ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids, &tbUid);
H
Hongze Cheng 已提交
799
    if (ret < 0) {
800
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
H
Hongze Cheng 已提交
801 802 803 804
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
805
    } else {
H
Hongze Cheng 已提交
806
      dropTbRsp.code = TSDB_CODE_SUCCESS;
807
      if (tbUid > 0) tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid);
H
Hongze Cheng 已提交
808 809 810 811 812
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

813
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);
814
  tdUpdateTbUidList(pVnode->pSma, pStore, false);
815

H
Hongze Cheng 已提交
816
_exit:
817
  taosArrayDestroy(tbUids);
818
  tdUidStoreFree(pStore);
H
Hongze Cheng 已提交
819
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
820 821 822 823 824
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
825
  taosArrayDestroy(rsp.pArray);
H
Hongze Cheng 已提交
826 827 828
  return 0;
}

829 830
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
                                              const char *tags) {
D
dapan 已提交
831 832 833 834
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;
C
Cary Xu 已提交
835
  int32_t        rv = -1;
D
dapan 已提交
836 837 838

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
839 840 841 842 843

  pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row), 1);  // TODO: use the real schema
  if (pSchema) {
    suid = msgIter->suid;
    rv = TD_ROW_SVER(blkIter.row);
D
dapan 已提交
844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

860
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
861
#if 1
H
Hongze Cheng 已提交
862
  int32_t code = 0;
D
dapan1121 已提交
863
  terrno = 0;
H
Hongze Cheng 已提交
864

H
Hongze Cheng 已提交
865
  SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
H
Hongze Cheng 已提交
866
  SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0};
H
Hongze Cheng 已提交
867
  SArray      *newTbUids = NULL;
H
Hongze Cheng 已提交
868 869 870 871
  int32_t      ret;
  SEncoder     ec = {0};

  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
872

H
Hongze Cheng 已提交
873 874
  // decode
  SDecoder dc = {0};
875
  tDecoderInit(&dc, pReq, len);
H
Hongze Cheng 已提交
876
  if (tDecodeSSubmitReq2(&dc, pSubmitReq) < 0) {
H
Hongze Cheng 已提交
877 878 879
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
880
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
881

H
Hongze Cheng 已提交
882
  // check
883 884 885 886 887
  code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq);
  if (code) {
    goto _exit;
  }

H
Hongze Cheng 已提交
888
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
H
Hongze Cheng 已提交
889 890 891 892 893 894 895 896 897 898
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

    if (pSubmitTbData->pCreateTbReq) {
      pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid;
    } else {
      SMetaInfo info = {0};

      code = metaGetInfo(pVnode->pMeta, pSubmitTbData->uid, &info, NULL);
      if (code) {
        code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
D
dapan1121 已提交
899
        vWarn("vgId:%d, table uid:%" PRId64 " not exists", TD_VID(pVnode), pSubmitTbData->uid);
H
Hongze Cheng 已提交
900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916
        goto _exit;
      }

      if (info.suid != pSubmitTbData->suid) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      if (info.suid) {
        metaGetInfo(pVnode->pMeta, info.suid, &info, NULL);
      }

      if (pSubmitTbData->sver != info.skmVer) {
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
        goto _exit;
      }
    }
H
Hongze Cheng 已提交
917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939

    if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
      int32_t   nColData = TARRAY_SIZE(pSubmitTbData->aCol);
      SColData *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);

      if (nColData <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      if (aColData[0].cid != PRIMARYKEY_TIMESTAMP_COL_ID || aColData[0].type != TSDB_DATA_TYPE_TIMESTAMP ||
          aColData[0].nVal <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      for (int32_t i = 1; i < nColData; i++) {
        if (aColData[i].nVal != aColData[0].nVal) {
          code = TSDB_CODE_INVALID_MSG;
          goto _exit;
        }
      }
    }
H
Hongze Cheng 已提交
940 941 942
  }

  // loop to handle
H
Hongze Cheng 已提交
943
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
H
Hongze Cheng 已提交
944 945 946 947
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

    // create table
    if (pSubmitTbData->pCreateTbReq) {
H
Hongze Cheng 已提交
948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977
      // check (TODO: move check to create table)
      code = grantCheck(TSDB_GRANT_TIMESERIES);
      if (code) goto _exit;

      code = grantCheck(TSDB_GRANT_TABLE);
      if (code) goto _exit;

      // alloc if need
      if (pSubmitRsp->aCreateTbRsp == NULL &&
          (pSubmitRsp->aCreateTbRsp = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(SVCreateTbRsp))) ==
              NULL) {
        code = TSDB_CODE_TDB_OUT_OF_MEMORY;
        goto _exit;
      }

      SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);

      // create table
      if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) ==
          0) {  // create table success

        if (newTbUids == NULL &&
            (newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) {
          code = TSDB_CODE_TDB_OUT_OF_MEMORY;
          goto _exit;
        }

        taosArrayPush(newTbUids, &pSubmitTbData->uid);

        if (pCreateTbRsp->pMeta) {
D
dapan1121 已提交
978
          vnodeUpdateMetaRsp(pVnode, pCreateTbRsp->pMeta);
H
Hongze Cheng 已提交
979 980 981 982 983 984
        }
      } else {  // create table failed
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
          code = terrno;
          goto _exit;
        }
H
Hongze Cheng 已提交
985
      }
H
Hongze Cheng 已提交
986 987 988
    }

    // insert data
H
Hongze Cheng 已提交
989 990 991 992 993
    int32_t affectedRows;
    code = tsdbInsertTableData(pVnode->pTsdb, version, pSubmitTbData, &affectedRows);
    if (code) goto _exit;

    pSubmitRsp->affectedRows += affectedRows;
H
Hongze Cheng 已提交
994
  }
H
Hongze Cheng 已提交
995

H
Hongze Cheng 已提交
996 997 998 999 1000
  // update table uid list
  if (taosArrayGetSize(newTbUids) > 0) {
    vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
           (int32_t)taosArrayGetSize(newTbUids));
    tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
H
Hongze Cheng 已提交
1001
  }
H
Hongze Cheng 已提交
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011

_exit:
  // message
  pRsp->code = code;
  tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSSubmitRsp2(&ec, pSubmitRsp);
  tEncoderClear(&ec);

H
Hongze Cheng 已提交
1012 1013 1014 1015 1016 1017
  // update statistics
  atomic_add_fetch_64(&pVnode->statis.nInsert, pSubmitRsp->affectedRows);
  atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows);
  atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
  if (code == 0) {
    atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
K
kailixu 已提交
1018
    tdProcessRSmaSubmit(pVnode->pSma, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
H
Hongze Cheng 已提交
1019 1020
  }

H
Hongze Cheng 已提交
1021 1022
  // clear
  taosArrayDestroy(newTbUids);
H
Hongze Cheng 已提交
1023
  tDestroySSubmitReq2(pSubmitReq, TSDB_MSG_FLG_DECODE);
H
Hongze Cheng 已提交
1024
  tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
H
Hongze Cheng 已提交
1025

D
dapan1121 已提交
1026 1027
  if (code) terrno = code;

H
Hongze Cheng 已提交
1028 1029
  return code;

H
Hongze Cheng 已提交
1030
#else
H
Hongze Cheng 已提交
1031 1032 1033 1034 1035 1036 1037 1038
  SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
  SSubmitRsp  submitRsp = {0};
  int32_t     nRows = 0;
  int32_t     tsize, ret;
  SEncoder    encoder = {0};
  SArray     *newTbUids = NULL;
  SVStatis    statis = {0};
  bool        tbCreated = false;
1039
  terrno = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
1040 1041

  pRsp->code = 0;
C
Cary Xu 已提交
1042
  pSubmitReq->version = version;
1043
  statis.nBatchInsert = 1;
C
Cary Xu 已提交
1044

C
Cary Xu 已提交
1045 1046 1047 1048 1049
  if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
    pRsp->code = terrno;
    goto _exit;
  }

C
Cary Xu 已提交
1050 1051
  submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
  newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
C
Cary Xu 已提交
1052
  if (!submitRsp.pArray || !newTbUids) {
1053 1054 1055 1056
    pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
1057
  for (;;) {
H
Hongze Cheng 已提交
1058 1059 1060
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

H
Hongze Cheng 已提交
1061
    SSubmitBlkRsp submitBlkRsp = {0};
D
dapan1121 已提交
1062
    tbCreated = false;
H
Hongze Cheng 已提交
1063

H
Hongze Cheng 已提交
1064 1065
    // create table for auto create table mode
    if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
      // tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
      // if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
      //   pRsp->code = TSDB_CODE_INVALID_MSG;
      //   tDecoderClear(&decoder);
      //   taosArrayDestroy(createTbReq.ctb.tagName);
      //   goto _exit;
      // }

      // if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
      //   pRsp->code = terrno;
      //   tDecoderClear(&decoder);
      //   taosArrayDestroy(createTbReq.ctb.tagName);
      //   goto _exit;
      // }

      // if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
      //   pRsp->code = terrno;
      //   tDecoderClear(&decoder);
      //   taosArrayDestroy(createTbReq.ctb.tagName);
      //   goto _exit;
      // }
wafwerar's avatar
wafwerar 已提交
1087

1088
      if (metaCreateTable(pVnode->pMeta, version, &createTbReq, &submitBlkRsp.pMeta) < 0) {
H
Hongze Cheng 已提交
1089 1090 1091 1092 1093 1094 1095
        // if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        //   submitBlkRsp.code = terrno;
        //   pRsp->code = terrno;
        //   tDecoderClear(&decoder);
        //   taosArrayDestroy(createTbReq.ctb.tagName);
        //   goto _exit;
        // }
1096 1097 1098 1099
      } else {
        if (NULL != submitBlkRsp.pMeta) {
          vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta);
        }
1100

H
Hongze Cheng 已提交
1101
        // taosArrayPush(newTbUids, &createTbReq.uid);
H
Hongze Cheng 已提交
1102

D
dapan1121 已提交
1103 1104 1105 1106 1107
        submitBlkRsp.uid = createTbReq.uid;
        submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
        sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name);
        tbCreated = true;
      }
1108

H
Hongze Cheng 已提交
1109 1110 1111 1112 1113 1114
      // msgIter.uid = createTbReq.uid;
      // if (createTbReq.type == TSDB_CHILD_TABLE) {
      //   msgIter.suid = createTbReq.ctb.suid;
      // } else {
      //   msgIter.suid = 0;
      // }
H
Hongze Cheng 已提交
1115

H
Hongze Cheng 已提交
1116 1117
      // tDecoderClear(&decoder);
      // taosArrayDestroy(createTbReq.ctb.tagName);
1118
    }
H
Hongze Cheng 已提交
1119

H
Hongze Cheng 已提交
1120
    if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
H
Hongze Cheng 已提交
1121
      submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
1122 1123
    }

H
Hongze Cheng 已提交
1124 1125
    submitRsp.numOfRows += submitBlkRsp.numOfRows;
    submitRsp.affectedRows += submitBlkRsp.affectedRows;
D
dapan1121 已提交
1126 1127 1128
    if (tbCreated || submitBlkRsp.code) {
      taosArrayPush(submitRsp.pArray, &submitBlkRsp);
    }
H
Hongze Cheng 已提交
1129
  }
H
Haojun Liao 已提交
1130

H
Hongze Cheng 已提交
1131 1132 1133 1134
  // if (taosArrayGetSize(newTbUids) > 0) {
  //   vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
  //          (int32_t)taosArrayGetSize(newTbUids));
  // }
H
Haojun Liao 已提交
1135

H
Hongze Cheng 已提交
1136
  // tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
1137

H
Hongze Cheng 已提交
1138
_exit:
1139
  taosArrayDestroy(newTbUids);
H
Hongze Cheng 已提交
1140 1141 1142 1143 1144 1145
  // tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
  // pRsp->pCont = rpcMallocCont(tsize);
  // pRsp->contLen = tsize;
  // tEncoderInit(&encoder, pRsp->pCont, tsize);
  // tEncodeSSubmitRsp(&encoder, &submitRsp);
  // tEncoderClear(&encoder);
H
Hongze Cheng 已提交
1146

1147
  taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp);
H
Hongze Cheng 已提交
1148

1149
  // TODO: the partial success scenario and the error case
H
Hongze Cheng 已提交
1150 1151
  // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
  // 1/level 2.
1152
  // TODO: refactor
C
Cary Xu 已提交
1153
  if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
1154
    statis.nBatchInsertSuccess = 1;
L
Liu Jicong 已提交
1155
    tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
1156
  }
C
Cary Xu 已提交
1157

1158
  // N.B. not strict as the following procedure is not atomic
1159 1160 1161
  atomic_add_fetch_64(&pVnode->statis.nInsert, submitRsp.numOfRows);
  atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows);
  atomic_add_fetch_64(&pVnode->statis.nBatchInsert, statis.nBatchInsert);
1162
  atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, statis.nBatchInsertSuccess);
1163

S
Shengliang Guan 已提交
1164
  vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version);
H
Hongze Cheng 已提交
1165
  return 0;
H
Hongze Cheng 已提交
1166 1167
#endif
  return 0;
L
Liu Jicong 已提交
1168
}
1169

1170
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
1171
  SVCreateTSmaReq req = {0};
C
Cary Xu 已提交
1172
  SDecoder        coder = {0};
1173

C
Cary Xu 已提交
1174 1175 1176 1177 1178 1179
  if (pRsp) {
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
    pRsp->code = TSDB_CODE_SUCCESS;
    pRsp->pCont = NULL;
    pRsp->contLen = 0;
  }
1180 1181 1182 1183 1184

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
C
Cary Xu 已提交
1185 1186
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
    if (pRsp) pRsp->code = terrno;
1187
    goto _err;
1188
  }
C
Cary Xu 已提交
1189

C
Cary Xu 已提交
1190
  if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
C
Cary Xu 已提交
1191
    if (pRsp) pRsp->code = terrno;
1192
    goto _err;
1193
  }
C
Cary Xu 已提交
1194

1195
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
1196
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
C
Cary Xu 已提交
1197
         req.indexName, req.indexUid, version, req.tableUid);
H
Hongze Cheng 已提交
1198
  return 0;
1199 1200 1201

_err:
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
1202
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
C
Cary Xu 已提交
1203
         TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr());
1204
  return -1;
L
Liu Jicong 已提交
1205
}
C
Cary Xu 已提交
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217

/**
 * @brief specific for smaDstVnode
 *
 * @param pVnode
 * @param pCont
 * @param contLen
 * @return int32_t
 */
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
1218 1219 1220 1221 1222 1223 1224 1225 1226

static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  vInfo("vgId:%d, alter replica confim msg is processed", TD_VID(pVnode));
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  return 0;
1227
}
S
Shengliang Guan 已提交
1228

1229
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
1230 1231
  vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));

1232 1233
  // todo
  // 1. stop work
S
Shengliang Guan 已提交
1234 1235 1236
  // 2. adjust hash range / compact / remove wals / rename vgroups
  // 3. reload sync
  return 0;
M
Minghao Li 已提交
1237
}
H
Hongze Cheng 已提交
1238

1239
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
1240 1241
  bool walChanged = false;
  bool tsdbChanged = false;
1242

S
Shengliang Guan 已提交
1243 1244
  SAlterVnodeConfigReq req = {0};
  if (tDeserializeSAlterVnodeConfigReq(pReq, len, &req) != 0) {
1245 1246 1247 1248
    terrno = TSDB_CODE_INVALID_MSG;
    return TSDB_CODE_INVALID_MSG;
  }

1249
  vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
S
Shengliang Guan 已提交
1250
        " cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d",
1251 1252
        TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
        req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
S
Shengliang Guan 已提交
1253
        req.walFsyncPeriod, req.walLevel);
1254 1255 1256

  if (pVnode->config.cacheLastSize != req.cacheLastSize) {
    pVnode->config.cacheLastSize = req.cacheLastSize;
1257 1258
    tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
  }
1259

1260
  if (pVnode->config.szBuf != req.buffer * 1024LL * 1024LL) {
S
Shengliang Guan 已提交
1261
    vInfo("vgId:%d, vnode buffer is changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pVnode->config.szBuf,
1262
          (uint64_t)(req.buffer * 1024LL * 1024LL));
1263
    pVnode->config.szBuf = req.buffer * 1024LL * 1024LL;
H
Hongze Cheng 已提交
1264 1265
  }

1266 1267
  if (pVnode->config.szCache != req.pages) {
    if (metaAlterCache(pVnode->pMeta, req.pages) < 0) {
S
Shengliang Guan 已提交
1268
      vError("vgId:%d, failed to change vnode pages from %d to %d failed since %s", TD_VID(pVnode),
1269
             pVnode->config.szCache, req.pages, tstrerror(errno));
H
Hongze Cheng 已提交
1270 1271
      return errno;
    } else {
S
Shengliang Guan 已提交
1272
      vInfo("vgId:%d, vnode pages is changed from %d to %d", TD_VID(pVnode), pVnode->config.szCache, req.pages);
1273
      pVnode->config.szCache = req.pages;
H
Hongze Cheng 已提交
1274
    }
H
Hongze Cheng 已提交
1275 1276
  }

1277 1278
  if (pVnode->config.cacheLast != req.cacheLast) {
    pVnode->config.cacheLast = req.cacheLast;
1279 1280
  }

1281 1282
  if (pVnode->config.walCfg.fsyncPeriod != req.walFsyncPeriod) {
    pVnode->config.walCfg.fsyncPeriod = req.walFsyncPeriod;
1283 1284 1285 1286

    walChanged = true;
  }

1287 1288
  if (pVnode->config.walCfg.level != req.walLevel) {
    pVnode->config.walCfg.level = req.walLevel;
1289 1290 1291 1292

    walChanged = true;
  }

1293 1294
  if (pVnode->config.tsdbCfg.keep0 != req.daysToKeep0) {
    pVnode->config.tsdbCfg.keep0 = req.daysToKeep0;
1295
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1296
      tsdbChanged = true;
1297 1298 1299
    }
  }

1300 1301
  if (pVnode->config.tsdbCfg.keep1 != req.daysToKeep1) {
    pVnode->config.tsdbCfg.keep1 = req.daysToKeep1;
1302
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1303
      tsdbChanged = true;
1304 1305 1306
    }
  }

1307 1308
  if (pVnode->config.tsdbCfg.keep2 != req.daysToKeep2) {
    pVnode->config.tsdbCfg.keep2 = req.daysToKeep2;
1309
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1310
      tsdbChanged = true;
1311 1312 1313 1314
    }
  }

  if (walChanged) {
M
Minglei Jin 已提交
1315 1316 1317 1318 1319
    walAlter(pVnode->pWal, &pVnode->config.walCfg);
  }

  if (tsdbChanged) {
    tsdbSetKeepCfg(pVnode->pTsdb, &pVnode->config.tsdbCfg);
1320 1321
  }

1322 1323 1324
  return 0;
}

1325 1326 1327 1328 1329 1330
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SBatchDeleteReq deleteReq;
  SDecoder        decoder;
  tDecoderInit(&decoder, pReq, len);
  tDecodeSBatchDeleteReq(&decoder, &deleteReq);

1331 1332 1333
  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);

1334 1335 1336
  int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
  for (int32_t i = 0; i < sz; i++) {
    SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i);
1337 1338 1339 1340 1341 1342 1343 1344
    char             *name = pOneReq->tbname;
    if (metaGetTableEntryByName(&mr, name) < 0) {
      vDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name);
      continue;
    }

    int64_t uid = mr.me.uid;

L
Liu Jicong 已提交
1345
    int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
1346 1347 1348
    if (code < 0) {
      terrno = code;
      vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
L
Liu Jicong 已提交
1349
             TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
1350
    }
1351 1352

    tDecoderClear(&mr.coder);
1353
  }
1354
  metaReaderClear(&mr);
1355
  taosArrayDestroy(deleteReq.deleteReqs);
1356 1357 1358
  return 0;
}

H
Hongze Cheng 已提交
1359 1360 1361 1362 1363
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  int32_t     code = 0;
  SDecoder   *pCoder = &(SDecoder){0};
  SDeleteRes *pRes = &(SDeleteRes){0};

wmmhello's avatar
wmmhello 已提交
1364 1365 1366 1367 1368
  pRsp->msgType = TDMT_VND_DELETE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

H
Hongze Cheng 已提交
1369 1370 1371 1372 1373 1374 1375 1376
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
L
Liu Jicong 已提交
1377
  ASSERT(taosArrayGetSize(pRes->uidList) == 0 || (pRes->skey != 0 && pRes->ekey != 0));
H
Hongze Cheng 已提交
1378 1379 1380 1381 1382 1383 1384 1385 1386

  for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
    code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
                               pRes->skey, pRes->ekey);
    if (code) goto _err;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pRes->uidList);
wmmhello's avatar
wmmhello 已提交
1387 1388

  SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows};
L
Liu Jicong 已提交
1389
  int32_t     ret = 0;
wmmhello's avatar
wmmhello 已提交
1390 1391
  tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
L
Liu Jicong 已提交
1392
  SEncoder ec = {0};
wmmhello's avatar
wmmhello 已提交
1393 1394 1395
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVDeleteRsp(&ec, &rsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
1396 1397 1398 1399
  return code;

_err:
  return code;
M
Minglei Jin 已提交
1400
}