vnodeSvr.c 30.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

18 19 20 21 22 23 24 25 26
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
27 28
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
29
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
S
Shengliang Guan 已提交
30
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
31
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
32

33
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
34
  int32_t  code = 0;
H
Hongze Cheng 已提交
35
  SDecoder dc = {0};
H
Hongze Cheng 已提交
36

H
Hongze Cheng 已提交
37 38 39 40 41 42 43 44 45 46 47
  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      int64_t ctime = taosGetTimestampMs();
      int32_t nReqs;

      tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
      tStartDecode(&dc);

      tDecodeI32v(&dc, &nReqs);
      for (int32_t iReq = 0; iReq < nReqs; iReq++) {
        tb_uid_t uid = tGenIdPI64();
H
Hongze Cheng 已提交
48
        char    *name = NULL;
H
Hongze Cheng 已提交
49 50 51
        tStartDecode(&dc);

        tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
52
        tDecodeCStr(&dc, &name);
H
Hongze Cheng 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66
        *(int64_t *)(dc.data + dc.pos) = uid;
        *(int64_t *)(dc.data + dc.pos + 8) = ctime;

        tEndDecode(&dc);
      }

      tEndDecode(&dc);
      tDecoderClear(&dc);
    } break;
    case TDMT_VND_SUBMIT: {
      SSubmitMsgIter msgIter = {0};
      SSubmitReq    *pSubmitReq = (SSubmitReq *)pMsg->pCont;
      SSubmitBlk    *pBlock = NULL;
      int64_t        ctime = taosGetTimestampMs();
H
Hongze Cheng 已提交
67
      tb_uid_t       uid;
H
Hongze Cheng 已提交
68 69 70 71 72 73 74 75

      tInitSubmitMsgIter(pSubmitReq, &msgIter);

      for (;;) {
        tGetSubmitMsgNext(&msgIter, &pBlock);
        if (pBlock == NULL) break;

        if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
76
          char *name = NULL;
H
Hongze Cheng 已提交
77

H
Hongze Cheng 已提交
78 79 80 81
          tDecoderInit(&dc, pBlock->data, msgIter.schemaLen);
          tStartDecode(&dc);

          tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
82 83 84 85 86 87
          tDecodeCStr(&dc, &name);

          uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
          if (uid == 0) {
            uid = tGenIdPI64();
          }
H
Hongze Cheng 已提交
88
          *(int64_t *)(dc.data + dc.pos) = uid;
H
Hongze Cheng 已提交
89
          *(int64_t *)(dc.data + dc.pos + 8) = ctime;
H
Hongze Cheng 已提交
90
          pBlock->uid = htobe64(uid);
H
Hongze Cheng 已提交
91 92 93 94 95 96 97

          tEndDecode(&dc);
          tDecoderClear(&dc);
        }
      }

    } break;
H
Hongze Cheng 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
    case TDMT_VND_DELETE: {
      int32_t     size;
      int32_t     ret;
      uint8_t    *pCont;
      SEncoder   *pCoder = &(SEncoder){0};
      SDeleteRes  res = {0};
      SReadHandle handle = {
          .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};

      code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
      if (code) goto _err;

      // malloc and encode
      tEncodeSize(tEncodeDeleteRes, &res, size, ret);
      pCont = rpcMallocCont(size + sizeof(SMsgHead));

      ((SMsgHead *)pCont)->contLen = htonl(size + sizeof(SMsgHead));
      ((SMsgHead *)pCont)->vgId = htonl(TD_VID(pVnode));

      tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
      tEncodeDeleteRes(pCoder, &res);
      tEncoderClear(pCoder);

      rpcFreeCont(pMsg->pCont);
      pMsg->pCont = pCont;
      pMsg->contLen = size + sizeof(SMsgHead);

      taosArrayDestroy(res.uidList);
    } break;
H
Hongze Cheng 已提交
127 128 129
    default:
      break;
  }
H
Hongze Cheng 已提交
130

S
Shengliang Guan 已提交
131
  return code;
H
Hongze Cheng 已提交
132 133 134 135

_err:
  vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
136 137
}

138
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
139 140 141 142
  void   *ptr = NULL;
  void   *pReq;
  int32_t len;
  int32_t ret;
H
Hongze Cheng 已提交
143

144
  vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
145
         version);
H
Hongze Cheng 已提交
146

H
Hongze Cheng 已提交
147
  pVnode->state.applied = version;
H
Hongze Cheng 已提交
148
  pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
H
Hongze Cheng 已提交
149

H
Hongze Cheng 已提交
150 151 152
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
H
Hongze Cheng 已提交
153 154

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
155
    /* META */
H
Hongze Cheng 已提交
156
    case TDMT_VND_CREATE_STB:
H
Hongze Cheng 已提交
157
      if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
158
      break;
H
Hongze Cheng 已提交
159
    case TDMT_VND_ALTER_STB:
H
Hongze Cheng 已提交
160
      if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
161
      break;
H
Hongze Cheng 已提交
162
    case TDMT_VND_DROP_STB:
H
Hongze Cheng 已提交
163
      if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
164
      break;
H
Hongze Cheng 已提交
165
    case TDMT_VND_CREATE_TABLE:
H
Hongze Cheng 已提交
166
      if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
167 168
      break;
    case TDMT_VND_ALTER_TABLE:
H
Hongze Cheng 已提交
169
      if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
170
      break;
H
Hongze Cheng 已提交
171
    case TDMT_VND_DROP_TABLE:
H
Hongze Cheng 已提交
172
      if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
173
      break;
174
    case TDMT_VND_DROP_TTL_TABLE:
175
      if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
176
      break;
S
Shengliang Guan 已提交
177 178 179 180
    case TDMT_VND_TRIM:
      if (vnodeProcessTrimReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
      break;
    case TDMT_VND_CREATE_SMA:
181
      if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
182
      break;
H
Hongze Cheng 已提交
183
    /* TSDB */
H
Hongze Cheng 已提交
184
    case TDMT_VND_SUBMIT:
H
Hongze Cheng 已提交
185
      if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
186
      break;
D
dapan1121 已提交
187
    case TDMT_VND_DELETE:
H
Hongze Cheng 已提交
188
      if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
D
dapan1121 已提交
189
      break;
H
Hongze Cheng 已提交
190
    /* TQ */
L
Liu Jicong 已提交
191 192 193
    case TDMT_VND_MQ_VG_CHANGE:
      if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                               pMsg->contLen - sizeof(SMsgHead)) < 0) {
194
        goto _err;
L
Liu Jicong 已提交
195 196
      }
      break;
L
Liu Jicong 已提交
197 198
    case TDMT_VND_MQ_VG_DELETE:
      if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
199 200 201 202 203 204 205
        goto _err;
      }
      break;
    case TDMT_VND_MQ_COMMIT_OFFSET:
      if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                   pMsg->contLen - sizeof(SMsgHead)) < 0) {
        goto _err;
L
Liu Jicong 已提交
206 207
      }
      break;
208
    case TDMT_STREAM_TASK_DEPLOY: {
L
Liu Jicong 已提交
209 210
      if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                 pMsg->contLen - sizeof(SMsgHead)) < 0) {
211
        goto _err;
H
Hongze Cheng 已提交
212 213
      }
    } break;
L
Liu Jicong 已提交
214
    case TDMT_STREAM_TASK_DROP: {
L
Liu Jicong 已提交
215 216 217 218
      if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
        goto _err;
      }
    } break;
219 220 221
    case TDMT_VND_ALTER_CONFIRM:
      vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
      break;
S
Shengliang Guan 已提交
222
    case TDMT_VND_ALTER_HASHRANGE:
223
      vnodeProcessAlterHashRangeReq(pVnode, version, pReq, len, pRsp);
S
Shengliang Guan 已提交
224
      break;
225
    case TDMT_VND_ALTER_CONFIG:
226
      vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
S
Shengliang Guan 已提交
227
      break;
H
Hongze Cheng 已提交
228 229
    case TDMT_VND_COMMIT:
      goto _do_commit;
H
Hongze Cheng 已提交
230 231 232 233 234
    default:
      ASSERT(0);
      break;
  }

235
  vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
H
Hongze Cheng 已提交
236

237
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
S
Shengliang Guan 已提交
238
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
239 240 241
    return -1;
  }

H
Hongze Cheng 已提交
242
  // commit if need
H
Hongze Cheng 已提交
243
  if (vnodeShouldCommit(pVnode)) {
H
Hongze Cheng 已提交
244
  _do_commit:
S
Shengliang Guan 已提交
245
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
H
Hongze Cheng 已提交
246 247 248 249 250
    // commit current change
    vnodeCommit(pVnode);

    // start a new one
    vnodeBegin(pVnode);
H
Hongze Cheng 已提交
251 252 253
  }

  return 0;
H
Hongze Cheng 已提交
254 255

_err:
S
Shengliang Guan 已提交
256
  vError("vgId:%d, process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
257 258
         tstrerror(terrno), version);
  return -1;
H
Hongze Cheng 已提交
259 260
}

261
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan1121 已提交
262
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
D
dapan1121 已提交
263 264 265 266 267 268 269
    return 0;
  }

  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg);
}

int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
270
  vTrace("message in vnode query queue is processing");
D
dapan1121 已提交
271
  if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
272 273 274 275
    vnodeRedirectRpcMsg(pVnode, pMsg);
    return 0;
  }

276
  SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Hongze Cheng 已提交
277
  switch (pMsg->msgType) {
D
dapan1121 已提交
278
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
279
    case TDMT_SCH_MERGE_QUERY:
D
dapan1121 已提交
280
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
281
    case TDMT_SCH_QUERY_CONTINUE:
D
dapan1121 已提交
282
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
283 284 285 286 287 288
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

289
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
H
Hongze Cheng 已提交
290
  vTrace("message in fetch queue is processing");
M
Minglei Jin 已提交
291 292 293
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META ||
       pMsg->msgType == TDMT_VND_TABLE_CFG) &&
      !vnodeIsLeader(pVnode)) {
294 295 296 297
    vnodeRedirectRpcMsg(pVnode, pMsg);
    return 0;
  }

H
Hongze Cheng 已提交
298 299
  char   *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
C
Cary Xu 已提交
300

H
Hongze Cheng 已提交
301
  switch (pMsg->msgType) {
D
dapan1121 已提交
302
    case TDMT_SCH_FETCH:
D
dapan1121 已提交
303
    case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
304
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
305
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
306
      return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
307
    case TDMT_SCH_CANCEL_TASK:
D
dapan1121 已提交
308
      return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
309
    case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
310
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
311
    case TDMT_SCH_QUERY_HEARTBEAT:
D
dapan1121 已提交
312
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
313 314
    case TDMT_VND_TABLE_META:
      return vnodeGetTableMeta(pVnode, pMsg);
D
dapan1121 已提交
315 316
    case TDMT_VND_TABLE_CFG:
      return vnodeGetTableCfg(pVnode, pMsg);
H
Hongze Cheng 已提交
317 318
    case TDMT_VND_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
319 320 321
    case TDMT_STREAM_TASK_RUN:
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
322
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
323
    case TDMT_STREAM_TASK_RECOVER:
L
Liu Jicong 已提交
324
      return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
325 326
    case TDMT_STREAM_RETRIEVE:
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
327
    case TDMT_STREAM_TASK_DISPATCH_RSP:
L
Liu Jicong 已提交
328
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
329
    case TDMT_STREAM_TASK_RECOVER_RSP:
L
Liu Jicong 已提交
330
      return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
331 332
    case TDMT_STREAM_RETRIEVE_RSP:
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
333 334 335 336 337 338 339 340 341 342
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
  // TODO

S
slzhou 已提交
343
  blockDebugShowDataBlocks(data, __func__);
344
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
345 346
}

D
dapan1121 已提交
347 348 349 350 351 352 353
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
  strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
  pMetaRsp->dbId = pVnode->config.dbId;
  pMetaRsp->vgId = TD_VID(pVnode);
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}

S
Shengliang Guan 已提交
354
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
355
  int32_t     code = 0;
S
Shengliang Guan 已提交
356 357 358
  SVTrimDbReq trimReq = {0};

  vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
H
Hongze Cheng 已提交
359 360 361 362 363

  // decode
  if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
S
Shengliang Guan 已提交
364 365
  }

H
Hongze Cheng 已提交
366 367 368 369 370 371
  // process
  code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
  if (code) goto _exit;

_exit:
  return code;
S
Shengliang Guan 已提交
372 373
}

L
Liu Jicong 已提交
374
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
375 376 377
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
  if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
378 379 380 381 382 383 384 385
  SVDropTtlTableReq ttlReq = {0};
  if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto end;
  }

  vInfo("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp);
  int32_t ret = metaTtlDropTable(pVnode->pMeta, ttlReq.timestamp, tbUids);
L
Liu Jicong 已提交
386
  if (ret != 0) {
387 388
    goto end;
  }
L
Liu Jicong 已提交
389
  if (taosArrayGetSize(tbUids) > 0) {
wmmhello's avatar
wmmhello 已提交
390 391
    tqUpdateTbUidList(pVnode->pTq, tbUids, false);
  }
392 393 394 395 396 397

end:
  taosArrayDestroy(tbUids);
  return ret;
}

398
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
399
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
400
  SDecoder       coder;
H
Hongze Cheng 已提交
401

H
Hongze Cheng 已提交
402 403 404 405 406 407
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
408
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
409 410

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
411 412
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
413 414
  }

H
Hongze Cheng 已提交
415
  if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
H
Hongze Cheng 已提交
416 417
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
418 419
  }

420
  if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) {
421 422 423
    pRsp->code = terrno;
    goto _err;
  }
C
Cary Xu 已提交
424

M
Minglei Jin 已提交
425 426
  taosMemoryFree(req.schemaRow.pSchema);
  taosMemoryFree(req.schemaTag.pSchema);
H
Hongze Cheng 已提交
427
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
428
  return 0;
H
Hongze Cheng 已提交
429 430

_err:
M
Minglei Jin 已提交
431 432
  taosMemoryFree(req.schemaRow.pSchema);
  taosMemoryFree(req.schemaTag.pSchema);
H
Hongze Cheng 已提交
433
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
434
  return -1;
H
Hongze Cheng 已提交
435 436
}

437
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
438
  SDecoder           decoder = {0};
439
  int32_t            rcode = 0;
H
Hongze Cheng 已提交
440 441
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
442 443 444
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
445
  STbUidStore       *pStore = NULL;
446
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
447 448

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
449 450 451
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
452

H
Hongze Cheng 已提交
453
  // decode
H
Hongze Cheng 已提交
454 455
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
456 457 458 459
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
460

H
Hongze Cheng 已提交
461
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
462 463
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
464 465 466 467 468
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
469
  // loop to create table
470
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
471
    pCreateReq = req.pReqs + iReq;
H
Hongze Cheng 已提交
472 473 474 475 476 477 478 479 480 481

    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
H
Hongze Cheng 已提交
482
    if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
H
Hongze Cheng 已提交
483 484 485 486 487
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
488
    } else {
H
Hongze Cheng 已提交
489
      cRsp.code = TSDB_CODE_SUCCESS;
490
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
491
      taosArrayPush(tbUids, &pCreateReq->uid);
H
Hongze Cheng 已提交
492
    }
H
Hongze Cheng 已提交
493 494

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
495 496
  }

497
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
498 499
  tdUpdateTbUidList(pVnode->pSma, pStore);
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
500

H
Hongze Cheng 已提交
501
  // prepare rsp
H
Hongze Cheng 已提交
502 503
  SEncoder encoder = {0};
  int32_t  ret = 0;
wafwerar's avatar
wafwerar 已提交
504
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
505 506 507 508 509 510
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
511 512
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
H
Hongze Cheng 已提交
513

H
Hongze Cheng 已提交
514
_exit:
wmmhello's avatar
wmmhello 已提交
515 516 517 518
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
    taosArrayDestroy(pCreateReq->ctb.tagName);
  }
H
Hongze Cheng 已提交
519
  taosArrayDestroy(rsp.pArray);
520
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
521 522
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
523
  return rcode;
H
Hongze Cheng 已提交
524 525
}

526
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
542
  }
H
Hongze Cheng 已提交
543 544 545 546 547 548 549 550 551

  if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
552 553 554
  return 0;
}

555
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
556
  SVDropStbReq req = {0};
557
  int32_t      rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
558
  SDecoder     decoder = {0};
H
Hongze Cheng 已提交
559 560 561 562 563 564

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
565 566
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
567 568 569 570 571
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
572 573 574 575
  if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
576

577 578 579 580 581
  if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
582 583 584
  // return rsp
_exit:
  pRsp->code = rcode;
H
Hongze Cheng 已提交
585
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
586 587 588
  return 0;
}

589
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
D
dapan1121 已提交
590 591 592
  SVAlterTbReq  vAlterTbReq = {0};
  SVAlterTbRsp  vAlterTbRsp = {0};
  SDecoder      dc = {0};
593 594
  int32_t       rcode = 0;
  int32_t       ret;
D
dapan1121 已提交
595 596
  SEncoder      ec = {0};
  STableMetaRsp vMetaRsp = {0};
H
Hongze Cheng 已提交
597 598 599 600 601 602 603 604 605 606

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
607
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
608
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
609 610
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
611 612 613
  }

  // process
D
dapan1121 已提交
614
  if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq, &vMetaRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
615
    vAlterTbRsp.code = terrno;
H
Hongze Cheng 已提交
616
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
617 618
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
619 620
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
621

D
dapan1121 已提交
622 623 624 625 626
  if (NULL != vMetaRsp.pSchemas) {
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
    vAlterTbRsp.pMeta = &vMetaRsp;
  }

H
Hongze Cheng 已提交
627 628 629 630 631 632
_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
633 634 635
  return 0;
}

636
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
637 638
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
639
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
640
  SEncoder         encoder = {0};
641
  int32_t          ret;
642
  SArray          *tbUids = NULL;
H
Hongze Cheng 已提交
643

H
Hongze Cheng 已提交
644
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
645 646 647
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
648 649

  // decode req
H
Hongze Cheng 已提交
650 651
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
652 653 654 655 656
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
657 658

  // process req
659
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
660
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
661 662
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

663
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
664 665
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
H
Hongze Cheng 已提交
666

H
Hongze Cheng 已提交
667
    /* code */
668
    ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
H
Hongze Cheng 已提交
669
    if (ret < 0) {
H
Hongze Cheng 已提交
670 671 672 673 674
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
675
    } else {
H
Hongze Cheng 已提交
676
      dropTbRsp.code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
677 678 679 680 681
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

682 683
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);

H
Hongze Cheng 已提交
684
_exit:
685
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
686
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
687 688 689 690 691
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
692
  taosArrayDestroy(rsp.pArray);
H
Hongze Cheng 已提交
693 694 695
  return 0;
}

696 697
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
                                              const char *tags) {
D
dapan 已提交
698 699 700 701
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;
C
Cary Xu 已提交
702
  int32_t        rv = -1;
D
dapan 已提交
703 704 705

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
C
Cary Xu 已提交
706
  if (!pSchema || (suid != msgIter->suid) || rv != TD_ROW_SVER(blkIter.row)) {
D
dapan 已提交
707 708 709
    if (pSchema) {
      taosMemoryFreeClear(pSchema);
    }
C
Cary Xu 已提交
710
    pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row));  // TODO: use the real schema
D
dapan 已提交
711 712
    if (pSchema) {
      suid = msgIter->suid;
C
Cary Xu 已提交
713
      rv = TD_ROW_SVER(blkIter.row);
D
dapan 已提交
714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730
    }
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

731
static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
C
Cary Xu 已提交
732 733 734 735 736 737 738 739 740
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SMeta         *pMeta = pVnode->pMeta;
  SSubmitBlk    *pBlock = NULL;

  if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
  while (true) {
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
    if (pBlock == NULL) break;
H
Hongze Cheng 已提交
741

D
dapan 已提交
742 743
    vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
  }
C
Cary Xu 已提交
744 745 746 747

  return 0;
}

748
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
749
  SSubmitReq    *pSubmitReq = (SSubmitReq *)pReq;
H
Hongze Cheng 已提交
750
  SSubmitRsp     submitRsp = {0};
H
Hongze Cheng 已提交
751 752 753 754
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock;
  SSubmitRsp     rsp = {0};
  SVCreateTbReq  createTbReq = {0};
H
Hongze Cheng 已提交
755
  SDecoder       decoder = {0};
H
Hongze Cheng 已提交
756
  int32_t        nRows;
H
Hongze Cheng 已提交
757 758
  int32_t        tsize, ret;
  SEncoder       encoder = {0};
759
  SArray        *newTbUids = NULL;
760
  terrno = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
761 762

  pRsp->code = 0;
C
Cary Xu 已提交
763

C
Cary Xu 已提交
764 765 766 767
#ifdef TD_DEBUG_PRINT_ROW
  vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif

C
Cary Xu 已提交
768 769 770 771 772
  if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
    pRsp->code = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
773
  // handle the request
H
Hongze Cheng 已提交
774 775 776
  if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
    pRsp->code = TSDB_CODE_INVALID_MSG;
    goto _exit;
H
Hongze Cheng 已提交
777 778
  }

C
Cary Xu 已提交
779 780
  submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
  newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
781 782 783 784 785
  if (!submitRsp.pArray) {
    pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
786
  for (;;) {
H
Hongze Cheng 已提交
787 788 789
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

H
Hongze Cheng 已提交
790 791
    SSubmitBlkRsp submitBlkRsp = {0};

H
Hongze Cheng 已提交
792 793
    // create table for auto create table mode
    if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
794 795
      submitBlkRsp.hashMeta = 1;

H
Hongze Cheng 已提交
796 797
      tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
      if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
H
Hongze Cheng 已提交
798
        pRsp->code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
799
        tDecoderClear(&decoder);
wmmhello's avatar
wmmhello 已提交
800
        taosArrayDestroy(createTbReq.ctb.tagName);
H
Hongze Cheng 已提交
801 802 803 804 805
        goto _exit;
      }

      if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
H
Hongze Cheng 已提交
806
          submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
807
          tDecoderClear(&decoder);
wmmhello's avatar
wmmhello 已提交
808
          taosArrayDestroy(createTbReq.ctb.tagName);
H
Hongze Cheng 已提交
809 810 811
          goto _exit;
        }
      }
812
      taosArrayPush(newTbUids, &createTbReq.uid);
H
Hongze Cheng 已提交
813

H
Hongze Cheng 已提交
814
      submitBlkRsp.uid = createTbReq.uid;
D
dapan 已提交
815
      submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
D
dapan 已提交
816
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
817

H
Hongze Cheng 已提交
818 819 820 821 822 823 824
      msgIter.uid = createTbReq.uid;
      if (createTbReq.type == TSDB_CHILD_TABLE) {
        msgIter.suid = createTbReq.ctb.suid;
      } else {
        msgIter.suid = 0;
      }

wmmhello's avatar
wmmhello 已提交
825
#ifdef TD_DEBUG_PRINT_ROW
D
dapan 已提交
826
      vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
wmmhello's avatar
wmmhello 已提交
827
#endif
H
Hongze Cheng 已提交
828
      tDecoderClear(&decoder);
wmmhello's avatar
wmmhello 已提交
829
      taosArrayDestroy(createTbReq.ctb.tagName);
D
dapan1121 已提交
830 831 832
    } else {
      submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
833 834
    }

H
Hongze Cheng 已提交
835
    if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
H
Hongze Cheng 已提交
836
      submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
837 838
    }

H
Hongze Cheng 已提交
839 840 841
    submitRsp.numOfRows += submitBlkRsp.numOfRows;
    submitRsp.affectedRows += submitBlkRsp.affectedRows;
    taosArrayPush(submitRsp.pArray, &submitBlkRsp);
H
Hongze Cheng 已提交
842
  }
843
  tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
844

H
Hongze Cheng 已提交
845
_exit:
846
  taosArrayDestroy(newTbUids);
H
Hongze Cheng 已提交
847 848 849 850 851 852 853 854
  tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
  pRsp->pCont = rpcMallocCont(tsize);
  pRsp->contLen = tsize;
  tEncoderInit(&encoder, pRsp->pCont, tsize);
  tEncodeSSubmitRsp(&encoder, &submitRsp);
  tEncoderClear(&encoder);

  for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
D
dapan 已提交
855
    taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
H
Hongze Cheng 已提交
856 857 858
  }

  taosArrayDestroy(submitRsp.pArray);
H
Hongze Cheng 已提交
859

860
  // TODO: the partial success scenario and the error case
H
Hongze Cheng 已提交
861 862
  // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
  // 1/level 2.
863
  // TODO: refactor
C
Cary Xu 已提交
864
  if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
L
Liu Jicong 已提交
865
    tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
866
  }
C
Cary Xu 已提交
867

H
Hongze Cheng 已提交
868
  return 0;
L
Liu Jicong 已提交
869
}
870

871
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
872
  SVCreateTSmaReq req = {0};
H
Hongze Cheng 已提交
873
  SDecoder        coder;
874

C
Cary Xu 已提交
875 876 877 878 879 880
  if (pRsp) {
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
    pRsp->code = TSDB_CODE_SUCCESS;
    pRsp->pCont = NULL;
    pRsp->contLen = 0;
  }
881 882 883 884 885

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
C
Cary Xu 已提交
886 887
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
    if (pRsp) pRsp->code = terrno;
888
    goto _err;
889
  }
C
Cary Xu 已提交
890

C
Cary Xu 已提交
891
  if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
C
Cary Xu 已提交
892
    if (pRsp) pRsp->code = terrno;
893
    goto _err;
894
  }
C
Cary Xu 已提交
895

896
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
897
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
C
Cary Xu 已提交
898
         req.indexName, req.indexUid, version, req.tableUid);
H
Hongze Cheng 已提交
899
  return 0;
900 901 902

_err:
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
903
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
C
Cary Xu 已提交
904
         TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr());
905
  return -1;
L
Liu Jicong 已提交
906
}
C
Cary Xu 已提交
907 908 909 910 911 912 913 914 915 916 917 918

/**
 * @brief specific for smaDstVnode
 *
 * @param pVnode
 * @param pCont
 * @param contLen
 * @return int32_t
 */
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
919 920 921 922 923 924 925 926 927

static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  vInfo("vgId:%d, alter replica confim msg is processed", TD_VID(pVnode));
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  return 0;
928
}
S
Shengliang Guan 已提交
929

930
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
931 932
  vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));

933 934
  // todo
  // 1. stop work
S
Shengliang Guan 已提交
935 936 937
  // 2. adjust hash range / compact / remove wals / rename vgroups
  // 3. reload sync
  return 0;
M
Minghao Li 已提交
938
}
H
Hongze Cheng 已提交
939

940 941 942 943 944 945 946 947 948
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SAlterVnodeReq alterReq = {0};
  if (tDeserializeSAlterVnodeReq(pReq, len, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return TSDB_CODE_INVALID_MSG;
  }

  vInfo("vgId:%d, start to alter vnode config, cacheLast:%d cacheLastSize:%d", TD_VID(pVnode), alterReq.cacheLast,
        alterReq.cacheLastSize);
949 950 951 952 953
  if (pVnode->config.cacheLastSize != alterReq.cacheLastSize) {
    pVnode->config.cacheLastSize = alterReq.cacheLastSize;
    // TODO: save config
    tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
  }
954 955 956
  return 0;
}

H
Hongze Cheng 已提交
957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  int32_t     code = 0;
  SDecoder   *pCoder = &(SDecoder){0};
  SDeleteRes *pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);

  for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
    code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
                               pRes->skey, pRes->ekey);
    if (code) goto _err;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pRes->uidList);
  return code;

_err:
  return code;
M
Minglei Jin 已提交
983
}