vnodeSvr.c 30.2 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

18 19 20 21 22 23 24 25 26
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
27 28
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
29
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
S
Shengliang Guan 已提交
30
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
31
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
32

33
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
34
  int32_t  code = 0;
H
Hongze Cheng 已提交
35
  SDecoder dc = {0};
H
Hongze Cheng 已提交
36

H
Hongze Cheng 已提交
37 38 39 40 41 42 43 44 45 46 47
  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      int64_t ctime = taosGetTimestampMs();
      int32_t nReqs;

      tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
      tStartDecode(&dc);

      tDecodeI32v(&dc, &nReqs);
      for (int32_t iReq = 0; iReq < nReqs; iReq++) {
        tb_uid_t uid = tGenIdPI64();
H
Hongze Cheng 已提交
48
        char    *name = NULL;
H
Hongze Cheng 已提交
49 50 51
        tStartDecode(&dc);

        tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
52
        tDecodeCStr(&dc, &name);
H
Hongze Cheng 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66
        *(int64_t *)(dc.data + dc.pos) = uid;
        *(int64_t *)(dc.data + dc.pos + 8) = ctime;

        tEndDecode(&dc);
      }

      tEndDecode(&dc);
      tDecoderClear(&dc);
    } break;
    case TDMT_VND_SUBMIT: {
      SSubmitMsgIter msgIter = {0};
      SSubmitReq    *pSubmitReq = (SSubmitReq *)pMsg->pCont;
      SSubmitBlk    *pBlock = NULL;
      int64_t        ctime = taosGetTimestampMs();
H
Hongze Cheng 已提交
67
      tb_uid_t       uid;
H
Hongze Cheng 已提交
68 69 70 71 72 73 74 75

      tInitSubmitMsgIter(pSubmitReq, &msgIter);

      for (;;) {
        tGetSubmitMsgNext(&msgIter, &pBlock);
        if (pBlock == NULL) break;

        if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
76
          char *name = NULL;
H
Hongze Cheng 已提交
77

H
Hongze Cheng 已提交
78 79 80 81
          tDecoderInit(&dc, pBlock->data, msgIter.schemaLen);
          tStartDecode(&dc);

          tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
82 83 84 85 86 87
          tDecodeCStr(&dc, &name);

          uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
          if (uid == 0) {
            uid = tGenIdPI64();
          }
H
Hongze Cheng 已提交
88
          *(int64_t *)(dc.data + dc.pos) = uid;
H
Hongze Cheng 已提交
89
          *(int64_t *)(dc.data + dc.pos + 8) = ctime;
H
Hongze Cheng 已提交
90
          pBlock->uid = htobe64(uid);
H
Hongze Cheng 已提交
91 92 93 94 95 96 97

          tEndDecode(&dc);
          tDecoderClear(&dc);
        }
      }

    } break;
H
Hongze Cheng 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
    case TDMT_VND_DELETE: {
      int32_t     size;
      int32_t     ret;
      uint8_t    *pCont;
      SEncoder   *pCoder = &(SEncoder){0};
      SDeleteRes  res = {0};
      SReadHandle handle = {
          .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};

      code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
      if (code) goto _err;

      // malloc and encode
      tEncodeSize(tEncodeDeleteRes, &res, size, ret);
      pCont = rpcMallocCont(size + sizeof(SMsgHead));

      ((SMsgHead *)pCont)->contLen = htonl(size + sizeof(SMsgHead));
      ((SMsgHead *)pCont)->vgId = htonl(TD_VID(pVnode));

      tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
      tEncodeDeleteRes(pCoder, &res);
      tEncoderClear(pCoder);

      rpcFreeCont(pMsg->pCont);
      pMsg->pCont = pCont;
      pMsg->contLen = size + sizeof(SMsgHead);

      taosArrayDestroy(res.uidList);
    } break;
H
Hongze Cheng 已提交
127 128 129
    default:
      break;
  }
H
Hongze Cheng 已提交
130

S
Shengliang Guan 已提交
131
  return code;
H
Hongze Cheng 已提交
132 133 134 135

_err:
  vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
136 137
}

138
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
139 140 141 142
  void   *ptr = NULL;
  void   *pReq;
  int32_t len;
  int32_t ret;
H
Hongze Cheng 已提交
143

144
  vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
145
         version);
H
Hongze Cheng 已提交
146

H
Hongze Cheng 已提交
147
  pVnode->state.applied = version;
H
Hongze Cheng 已提交
148
  pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
H
Hongze Cheng 已提交
149

H
Hongze Cheng 已提交
150 151 152
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
H
Hongze Cheng 已提交
153 154

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
155
    /* META */
H
Hongze Cheng 已提交
156
    case TDMT_VND_CREATE_STB:
H
Hongze Cheng 已提交
157
      if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
158
      break;
H
Hongze Cheng 已提交
159
    case TDMT_VND_ALTER_STB:
H
Hongze Cheng 已提交
160
      if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
161
      break;
H
Hongze Cheng 已提交
162
    case TDMT_VND_DROP_STB:
H
Hongze Cheng 已提交
163
      if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
164
      break;
H
Hongze Cheng 已提交
165
    case TDMT_VND_CREATE_TABLE:
H
Hongze Cheng 已提交
166
      if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
167 168
      break;
    case TDMT_VND_ALTER_TABLE:
H
Hongze Cheng 已提交
169
      if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
170
      break;
H
Hongze Cheng 已提交
171
    case TDMT_VND_DROP_TABLE:
H
Hongze Cheng 已提交
172
      if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
173
      break;
174
    case TDMT_VND_DROP_TTL_TABLE:
175
      if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
176
      break;
S
Shengliang Guan 已提交
177 178 179 180
    case TDMT_VND_TRIM:
      if (vnodeProcessTrimReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
      break;
    case TDMT_VND_CREATE_SMA:
181
      if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
182
      break;
H
Hongze Cheng 已提交
183
    /* TSDB */
H
Hongze Cheng 已提交
184
    case TDMT_VND_SUBMIT:
H
Hongze Cheng 已提交
185
      if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
186
      break;
D
dapan1121 已提交
187
    case TDMT_VND_DELETE:
H
Hongze Cheng 已提交
188
      if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
D
dapan1121 已提交
189
      break;
H
Hongze Cheng 已提交
190
    /* TQ */
L
Liu Jicong 已提交
191 192 193
    case TDMT_VND_MQ_VG_CHANGE:
      if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                               pMsg->contLen - sizeof(SMsgHead)) < 0) {
194
        goto _err;
L
Liu Jicong 已提交
195 196
      }
      break;
L
Liu Jicong 已提交
197 198
    case TDMT_VND_MQ_VG_DELETE:
      if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
199 200 201 202 203 204 205
        goto _err;
      }
      break;
    case TDMT_VND_MQ_COMMIT_OFFSET:
      if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                   pMsg->contLen - sizeof(SMsgHead)) < 0) {
        goto _err;
L
Liu Jicong 已提交
206 207
      }
      break;
208
    case TDMT_STREAM_TASK_DEPLOY: {
L
Liu Jicong 已提交
209 210
      if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                 pMsg->contLen - sizeof(SMsgHead)) < 0) {
211
        goto _err;
H
Hongze Cheng 已提交
212 213
      }
    } break;
L
Liu Jicong 已提交
214
    case TDMT_STREAM_TASK_DROP: {
L
Liu Jicong 已提交
215 216 217 218
      if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
        goto _err;
      }
    } break;
219 220 221
    case TDMT_VND_ALTER_CONFIRM:
      vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
      break;
S
Shengliang Guan 已提交
222
    case TDMT_VND_ALTER_HASHRANGE:
223
      vnodeProcessAlterHashRangeReq(pVnode, version, pReq, len, pRsp);
S
Shengliang Guan 已提交
224
      break;
225
    case TDMT_VND_ALTER_CONFIG:
226
      vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
S
Shengliang Guan 已提交
227
      break;
H
Hongze Cheng 已提交
228 229
    case TDMT_VND_COMMIT:
      goto _do_commit;
H
Hongze Cheng 已提交
230 231 232 233 234
    default:
      ASSERT(0);
      break;
  }

235
  vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
H
Hongze Cheng 已提交
236

237
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
S
Shengliang Guan 已提交
238
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
239 240 241
    return -1;
  }

H
Hongze Cheng 已提交
242
  // commit if need
H
Hongze Cheng 已提交
243
  if (vnodeShouldCommit(pVnode)) {
H
Hongze Cheng 已提交
244
  _do_commit:
S
Shengliang Guan 已提交
245
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
H
Hongze Cheng 已提交
246 247 248 249 250
    // commit current change
    vnodeCommit(pVnode);

    // start a new one
    vnodeBegin(pVnode);
H
Hongze Cheng 已提交
251 252 253
  }

  return 0;
H
Hongze Cheng 已提交
254 255

_err:
S
Shengliang Guan 已提交
256
  vError("vgId:%d, process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
257 258
         tstrerror(terrno), version);
  return -1;
H
Hongze Cheng 已提交
259 260
}

261
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan1121 已提交
262
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
D
dapan1121 已提交
263 264 265 266 267 268 269
    return 0;
  }

  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg);
}

int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
270
  vTrace("message in vnode query queue is processing");
D
dapan1121 已提交
271
  if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
272 273 274 275
    vnodeRedirectRpcMsg(pVnode, pMsg);
    return 0;
  }

276
  SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Hongze Cheng 已提交
277
  switch (pMsg->msgType) {
D
dapan1121 已提交
278
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
279
    case TDMT_SCH_MERGE_QUERY:
D
dapan1121 已提交
280
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
281
    case TDMT_SCH_QUERY_CONTINUE:
D
dapan1121 已提交
282
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
283 284 285 286 287 288
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

289
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
H
Hongze Cheng 已提交
290
  vTrace("message in fetch queue is processing");
M
Minglei Jin 已提交
291 292 293
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META ||
       pMsg->msgType == TDMT_VND_TABLE_CFG) &&
      !vnodeIsLeader(pVnode)) {
294 295 296 297
    vnodeRedirectRpcMsg(pVnode, pMsg);
    return 0;
  }

H
Hongze Cheng 已提交
298 299
  char   *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
C
Cary Xu 已提交
300

H
Hongze Cheng 已提交
301
  switch (pMsg->msgType) {
D
dapan1121 已提交
302
    case TDMT_SCH_FETCH:
D
dapan1121 已提交
303
    case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
304
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
305
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
306
      return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
307
    case TDMT_SCH_CANCEL_TASK:
D
dapan1121 已提交
308
      return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
309
    case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
310
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
311
    case TDMT_SCH_QUERY_HEARTBEAT:
D
dapan1121 已提交
312
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
313 314
    case TDMT_VND_TABLE_META:
      return vnodeGetTableMeta(pVnode, pMsg);
D
dapan1121 已提交
315 316
    case TDMT_VND_TABLE_CFG:
      return vnodeGetTableCfg(pVnode, pMsg);
H
Hongze Cheng 已提交
317 318
    case TDMT_VND_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
319 320 321
    case TDMT_STREAM_TASK_RUN:
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
322
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
323
    case TDMT_STREAM_TASK_RECOVER:
L
Liu Jicong 已提交
324
      return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
325 326
    case TDMT_STREAM_RETRIEVE:
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
327
    case TDMT_STREAM_TASK_DISPATCH_RSP:
L
Liu Jicong 已提交
328
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
329
    case TDMT_STREAM_TASK_RECOVER_RSP:
L
Liu Jicong 已提交
330
      return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
331 332
    case TDMT_STREAM_RETRIEVE_RSP:
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
333 334 335 336 337 338 339 340 341 342
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
  // TODO

S
slzhou 已提交
343
  blockDebugShowDataBlocks(data, __func__);
344
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
345 346
}

D
dapan1121 已提交
347 348 349 350 351 352 353
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
  strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
  pMetaRsp->dbId = pVnode->config.dbId;
  pMetaRsp->vgId = TD_VID(pVnode);
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}

S
Shengliang Guan 已提交
354
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
355
  int32_t     code = 0;
S
Shengliang Guan 已提交
356 357 358
  SVTrimDbReq trimReq = {0};

  vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
H
Hongze Cheng 已提交
359 360 361 362 363

  // decode
  if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
S
Shengliang Guan 已提交
364 365
  }

H
Hongze Cheng 已提交
366 367 368 369 370 371
  // process
  code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
  if (code) goto _exit;

_exit:
  return code;
S
Shengliang Guan 已提交
372 373
}

L
Liu Jicong 已提交
374
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
375 376 377
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
  if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
378 379 380 381 382 383 384 385
  SVDropTtlTableReq ttlReq = {0};
  if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto end;
  }

  vInfo("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp);
  int32_t ret = metaTtlDropTable(pVnode->pMeta, ttlReq.timestamp, tbUids);
L
Liu Jicong 已提交
386
  if (ret != 0) {
387 388
    goto end;
  }
L
Liu Jicong 已提交
389
  if (taosArrayGetSize(tbUids) > 0) {
wmmhello's avatar
wmmhello 已提交
390 391
    tqUpdateTbUidList(pVnode->pTq, tbUids, false);
  }
392 393 394 395 396 397

end:
  taosArrayDestroy(tbUids);
  return ret;
}

398
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
399
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
400
  SDecoder       coder;
H
Hongze Cheng 已提交
401

H
Hongze Cheng 已提交
402 403 404 405 406 407
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
408
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
409 410

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
411 412
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
413 414
  }

H
Hongze Cheng 已提交
415
  if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
H
Hongze Cheng 已提交
416 417
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
418 419
  }

420
  if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) {
421 422 423
    pRsp->code = terrno;
    goto _err;
  }
C
Cary Xu 已提交
424

M
Minglei Jin 已提交
425 426
  taosMemoryFree(req.schemaRow.pSchema);
  taosMemoryFree(req.schemaTag.pSchema);
H
Hongze Cheng 已提交
427
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
428
  return 0;
H
Hongze Cheng 已提交
429 430

_err:
M
Minglei Jin 已提交
431 432
  taosMemoryFree(req.schemaRow.pSchema);
  taosMemoryFree(req.schemaTag.pSchema);
H
Hongze Cheng 已提交
433
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
434
  return -1;
H
Hongze Cheng 已提交
435 436
}

437
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
438
  SDecoder           decoder = {0};
439
  int32_t            rcode = 0;
H
Hongze Cheng 已提交
440 441
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
442 443 444
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
445
  STbUidStore       *pStore = NULL;
446
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
447 448

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
449 450 451
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
452

H
Hongze Cheng 已提交
453
  // decode
H
Hongze Cheng 已提交
454 455
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
456 457 458 459
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
460

H
Hongze Cheng 已提交
461
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
462 463
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
464 465 466 467 468
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
469
  // loop to create table
470
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
471
    pCreateReq = req.pReqs + iReq;
H
Hongze Cheng 已提交
472 473 474 475 476 477 478 479 480 481

    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
H
Hongze Cheng 已提交
482
    if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
H
Hongze Cheng 已提交
483 484 485 486 487
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
488
    } else {
H
Hongze Cheng 已提交
489
      cRsp.code = TSDB_CODE_SUCCESS;
490
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
491
      taosArrayPush(tbUids, &pCreateReq->uid);
H
Hongze Cheng 已提交
492
    }
H
Hongze Cheng 已提交
493 494

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
495 496
  }

497
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
498 499
  tdUpdateTbUidList(pVnode->pSma, pStore);
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
500

H
Hongze Cheng 已提交
501
  // prepare rsp
H
Hongze Cheng 已提交
502 503
  SEncoder encoder = {0};
  int32_t  ret = 0;
wafwerar's avatar
wafwerar 已提交
504
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
505 506 507 508 509 510
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
511 512
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
H
Hongze Cheng 已提交
513

H
Hongze Cheng 已提交
514
_exit:
wmmhello's avatar
wmmhello 已提交
515 516 517 518
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
    taosArrayDestroy(pCreateReq->ctb.tagName);
  }
H
Hongze Cheng 已提交
519
  taosArrayDestroy(rsp.pArray);
520
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
521 522
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
523
  return rcode;
H
Hongze Cheng 已提交
524 525
}

526
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
542
  }
H
Hongze Cheng 已提交
543 544 545 546 547 548 549 550 551

  if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
552 553 554
  return 0;
}

555
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
556
  SVDropStbReq req = {0};
557
  int32_t      rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
558
  SDecoder     decoder = {0};
H
Hongze Cheng 已提交
559 560 561 562 563 564

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
565 566
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
567 568 569 570 571
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
572 573 574 575
  if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
576

577 578 579 580 581
  if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
582 583 584
  // return rsp
_exit:
  pRsp->code = rcode;
H
Hongze Cheng 已提交
585
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
586 587 588
  return 0;
}

589
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
D
dapan1121 已提交
590 591 592
  SVAlterTbReq  vAlterTbReq = {0};
  SVAlterTbRsp  vAlterTbRsp = {0};
  SDecoder      dc = {0};
593 594
  int32_t       rcode = 0;
  int32_t       ret;
D
dapan1121 已提交
595 596
  SEncoder      ec = {0};
  STableMetaRsp vMetaRsp = {0};
H
Hongze Cheng 已提交
597 598 599 600 601 602 603 604 605 606

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
607
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
608
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
609 610
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
611 612 613
  }

  // process
D
dapan1121 已提交
614
  if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq, &vMetaRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
615
    vAlterTbRsp.code = terrno;
H
Hongze Cheng 已提交
616
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
617 618
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
619 620
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
621

D
dapan1121 已提交
622 623 624 625 626
  if (NULL != vMetaRsp.pSchemas) {
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
    vAlterTbRsp.pMeta = &vMetaRsp;
  }

H
Hongze Cheng 已提交
627 628 629 630 631 632
_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
633 634 635
  return 0;
}

636
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
637 638
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
639
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
640
  SEncoder         encoder = {0};
641
  int32_t          ret;
642
  SArray          *tbUids = NULL;
H
Hongze Cheng 已提交
643

H
Hongze Cheng 已提交
644
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
645 646 647
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
648 649

  // decode req
H
Hongze Cheng 已提交
650 651
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
652 653 654 655 656
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
657 658

  // process req
659
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
660
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
661 662
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

663
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
664 665
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
H
Hongze Cheng 已提交
666

H
Hongze Cheng 已提交
667
    /* code */
668
    ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
H
Hongze Cheng 已提交
669
    if (ret < 0) {
H
Hongze Cheng 已提交
670 671 672 673 674
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
675
    } else {
H
Hongze Cheng 已提交
676
      dropTbRsp.code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
677 678 679 680 681
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

682 683
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);

H
Hongze Cheng 已提交
684
_exit:
685
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
686
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
687 688 689 690 691
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
692 693 694
  return 0;
}

695 696
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
                                              const char *tags) {
D
dapan 已提交
697 698 699 700
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;
C
Cary Xu 已提交
701
  int32_t        rv = -1;
D
dapan 已提交
702 703 704

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
C
Cary Xu 已提交
705
  if (!pSchema || (suid != msgIter->suid) || rv != TD_ROW_SVER(blkIter.row)) {
D
dapan 已提交
706 707 708
    if (pSchema) {
      taosMemoryFreeClear(pSchema);
    }
C
Cary Xu 已提交
709
    pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row));  // TODO: use the real schema
D
dapan 已提交
710 711
    if (pSchema) {
      suid = msgIter->suid;
C
Cary Xu 已提交
712
      rv = TD_ROW_SVER(blkIter.row);
D
dapan 已提交
713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729
    }
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

730
static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
C
Cary Xu 已提交
731 732 733 734 735 736 737 738 739
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SMeta         *pMeta = pVnode->pMeta;
  SSubmitBlk    *pBlock = NULL;

  if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
  while (true) {
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
    if (pBlock == NULL) break;
H
Hongze Cheng 已提交
740

D
dapan 已提交
741 742
    vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
  }
C
Cary Xu 已提交
743 744 745 746

  return 0;
}

747
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
748
  SSubmitReq    *pSubmitReq = (SSubmitReq *)pReq;
H
Hongze Cheng 已提交
749
  SSubmitRsp     submitRsp = {0};
H
Hongze Cheng 已提交
750 751 752 753
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock;
  SSubmitRsp     rsp = {0};
  SVCreateTbReq  createTbReq = {0};
H
Hongze Cheng 已提交
754
  SDecoder       decoder = {0};
H
Hongze Cheng 已提交
755
  int32_t        nRows;
H
Hongze Cheng 已提交
756 757
  int32_t        tsize, ret;
  SEncoder       encoder = {0};
758
  SArray        *newTbUids = NULL;
759
  terrno = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
760 761

  pRsp->code = 0;
C
Cary Xu 已提交
762

C
Cary Xu 已提交
763 764 765 766
#ifdef TD_DEBUG_PRINT_ROW
  vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif

C
Cary Xu 已提交
767 768 769 770 771
  if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
    pRsp->code = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
772
  // handle the request
H
Hongze Cheng 已提交
773 774 775
  if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
    pRsp->code = TSDB_CODE_INVALID_MSG;
    goto _exit;
H
Hongze Cheng 已提交
776 777
  }

C
Cary Xu 已提交
778 779
  submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
  newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
780 781 782 783 784
  if (!submitRsp.pArray) {
    pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
785
  for (;;) {
H
Hongze Cheng 已提交
786 787 788
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

H
Hongze Cheng 已提交
789 790
    SSubmitBlkRsp submitBlkRsp = {0};

H
Hongze Cheng 已提交
791 792
    // create table for auto create table mode
    if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
793 794
      submitBlkRsp.hashMeta = 1;

H
Hongze Cheng 已提交
795 796
      tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
      if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
H
Hongze Cheng 已提交
797
        pRsp->code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
798
        tDecoderClear(&decoder);
wmmhello's avatar
wmmhello 已提交
799
        taosArrayDestroy(createTbReq.ctb.tagName);
H
Hongze Cheng 已提交
800 801 802 803 804
        goto _exit;
      }

      if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
H
Hongze Cheng 已提交
805
          submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
806
          tDecoderClear(&decoder);
wmmhello's avatar
wmmhello 已提交
807
          taosArrayDestroy(createTbReq.ctb.tagName);
H
Hongze Cheng 已提交
808 809 810
          goto _exit;
        }
      }
811
      taosArrayPush(newTbUids, &createTbReq.uid);
H
Hongze Cheng 已提交
812

H
Hongze Cheng 已提交
813
      submitBlkRsp.uid = createTbReq.uid;
D
dapan 已提交
814
      submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
D
dapan 已提交
815
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
816

H
Hongze Cheng 已提交
817 818 819 820 821 822 823
      msgIter.uid = createTbReq.uid;
      if (createTbReq.type == TSDB_CHILD_TABLE) {
        msgIter.suid = createTbReq.ctb.suid;
      } else {
        msgIter.suid = 0;
      }

wmmhello's avatar
wmmhello 已提交
824
#ifdef TD_DEBUG_PRINT_ROW
D
dapan 已提交
825
      vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
wmmhello's avatar
wmmhello 已提交
826
#endif
H
Hongze Cheng 已提交
827
      tDecoderClear(&decoder);
wmmhello's avatar
wmmhello 已提交
828
      taosArrayDestroy(createTbReq.ctb.tagName);
D
dapan1121 已提交
829 830 831
    } else {
      submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
832 833
    }

H
Hongze Cheng 已提交
834
    if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
H
Hongze Cheng 已提交
835
      submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
836 837
    }

H
Hongze Cheng 已提交
838 839 840
    submitRsp.numOfRows += submitBlkRsp.numOfRows;
    submitRsp.affectedRows += submitBlkRsp.affectedRows;
    taosArrayPush(submitRsp.pArray, &submitBlkRsp);
H
Hongze Cheng 已提交
841
  }
842
  tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
843

H
Hongze Cheng 已提交
844
_exit:
845
  taosArrayDestroy(newTbUids);
H
Hongze Cheng 已提交
846 847 848 849 850 851 852 853
  tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
  pRsp->pCont = rpcMallocCont(tsize);
  pRsp->contLen = tsize;
  tEncoderInit(&encoder, pRsp->pCont, tsize);
  tEncodeSSubmitRsp(&encoder, &submitRsp);
  tEncoderClear(&encoder);

  for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
D
dapan 已提交
854
    taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
H
Hongze Cheng 已提交
855 856 857
  }

  taosArrayDestroy(submitRsp.pArray);
H
Hongze Cheng 已提交
858

859
  // TODO: the partial success scenario and the error case
H
Hongze Cheng 已提交
860 861
  // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
  // 1/level 2.
862
  // TODO: refactor
C
Cary Xu 已提交
863
  if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
L
Liu Jicong 已提交
864
    tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
865
  }
C
Cary Xu 已提交
866

H
Hongze Cheng 已提交
867
  return 0;
L
Liu Jicong 已提交
868
}
869

870
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
871
  SVCreateTSmaReq req = {0};
H
Hongze Cheng 已提交
872
  SDecoder        coder;
873

C
Cary Xu 已提交
874 875 876 877 878 879
  if (pRsp) {
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
    pRsp->code = TSDB_CODE_SUCCESS;
    pRsp->pCont = NULL;
    pRsp->contLen = 0;
  }
880 881 882 883 884

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
C
Cary Xu 已提交
885 886
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
    if (pRsp) pRsp->code = terrno;
887
    goto _err;
888
  }
C
Cary Xu 已提交
889

C
Cary Xu 已提交
890
  if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
C
Cary Xu 已提交
891
    if (pRsp) pRsp->code = terrno;
892
    goto _err;
893
  }
C
Cary Xu 已提交
894

895
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
896
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
C
Cary Xu 已提交
897
         req.indexName, req.indexUid, version, req.tableUid);
H
Hongze Cheng 已提交
898
  return 0;
899 900 901

_err:
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
902
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
C
Cary Xu 已提交
903
         TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr(terrno));
904
  return -1;
L
Liu Jicong 已提交
905
}
C
Cary Xu 已提交
906 907 908 909 910 911 912 913 914 915 916 917

/**
 * @brief specific for smaDstVnode
 *
 * @param pVnode
 * @param pCont
 * @param contLen
 * @return int32_t
 */
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
918 919 920 921 922 923 924 925 926

static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  vInfo("vgId:%d, alter replica confim msg is processed", TD_VID(pVnode));
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  return 0;
927
}
S
Shengliang Guan 已提交
928

929
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
930 931
  vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));

932 933
  // todo
  // 1. stop work
S
Shengliang Guan 已提交
934 935 936
  // 2. adjust hash range / compact / remove wals / rename vgroups
  // 3. reload sync
  return 0;
M
Minghao Li 已提交
937
}
H
Hongze Cheng 已提交
938

939 940 941 942 943 944 945 946 947
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SAlterVnodeReq alterReq = {0};
  if (tDeserializeSAlterVnodeReq(pReq, len, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return TSDB_CODE_INVALID_MSG;
  }

  vInfo("vgId:%d, start to alter vnode config, cacheLast:%d cacheLastSize:%d", TD_VID(pVnode), alterReq.cacheLast,
        alterReq.cacheLastSize);
948 949 950 951 952
  if (pVnode->config.cacheLastSize != alterReq.cacheLastSize) {
    pVnode->config.cacheLastSize = alterReq.cacheLastSize;
    // TODO: save config
    tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
  }
953 954 955
  return 0;
}

H
Hongze Cheng 已提交
956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  int32_t     code = 0;
  SDecoder   *pCoder = &(SDecoder){0};
  SDeleteRes *pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);

  for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
    code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
                               pRes->skey, pRes->ekey);
    if (code) goto _err;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pRes->uidList);
  return code;

_err:
  return code;
M
Minglei Jin 已提交
982
}