vnodeSvr.c 30.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

18 19 20 21 22 23 24 25 26
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
27 28
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
29
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
S
Shengliang Guan 已提交
30
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
31
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
32

33
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
34
  int32_t  code = 0;
H
Hongze Cheng 已提交
35
  SDecoder dc = {0};
H
Hongze Cheng 已提交
36

H
Hongze Cheng 已提交
37 38 39 40 41 42 43 44 45 46 47
  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      int64_t ctime = taosGetTimestampMs();
      int32_t nReqs;

      tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
      tStartDecode(&dc);

      tDecodeI32v(&dc, &nReqs);
      for (int32_t iReq = 0; iReq < nReqs; iReq++) {
        tb_uid_t uid = tGenIdPI64();
H
Hongze Cheng 已提交
48
        char    *name = NULL;
H
Hongze Cheng 已提交
49 50 51
        tStartDecode(&dc);

        tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
52
        tDecodeCStr(&dc, &name);
H
Hongze Cheng 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66
        *(int64_t *)(dc.data + dc.pos) = uid;
        *(int64_t *)(dc.data + dc.pos + 8) = ctime;

        tEndDecode(&dc);
      }

      tEndDecode(&dc);
      tDecoderClear(&dc);
    } break;
    case TDMT_VND_SUBMIT: {
      SSubmitMsgIter msgIter = {0};
      SSubmitReq    *pSubmitReq = (SSubmitReq *)pMsg->pCont;
      SSubmitBlk    *pBlock = NULL;
      int64_t        ctime = taosGetTimestampMs();
H
Hongze Cheng 已提交
67
      tb_uid_t       uid;
H
Hongze Cheng 已提交
68 69 70 71 72 73 74 75

      tInitSubmitMsgIter(pSubmitReq, &msgIter);

      for (;;) {
        tGetSubmitMsgNext(&msgIter, &pBlock);
        if (pBlock == NULL) break;

        if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
76
          char *name = NULL;
H
Hongze Cheng 已提交
77

H
Hongze Cheng 已提交
78 79 80 81
          tDecoderInit(&dc, pBlock->data, msgIter.schemaLen);
          tStartDecode(&dc);

          tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
82 83 84 85 86 87
          tDecodeCStr(&dc, &name);

          uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
          if (uid == 0) {
            uid = tGenIdPI64();
          }
H
Hongze Cheng 已提交
88
          *(int64_t *)(dc.data + dc.pos) = uid;
H
Hongze Cheng 已提交
89
          *(int64_t *)(dc.data + dc.pos + 8) = ctime;
H
Hongze Cheng 已提交
90
          pBlock->uid = htobe64(uid);
H
Hongze Cheng 已提交
91 92 93 94 95 96 97

          tEndDecode(&dc);
          tDecoderClear(&dc);
        }
      }

    } break;
H
Hongze Cheng 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
    case TDMT_VND_DELETE: {
      int32_t     size;
      int32_t     ret;
      uint8_t    *pCont;
      SEncoder   *pCoder = &(SEncoder){0};
      SDeleteRes  res = {0};
      SReadHandle handle = {
          .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};

      code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
      if (code) goto _err;

      // malloc and encode
      tEncodeSize(tEncodeDeleteRes, &res, size, ret);
      pCont = rpcMallocCont(size + sizeof(SMsgHead));

      ((SMsgHead *)pCont)->contLen = htonl(size + sizeof(SMsgHead));
      ((SMsgHead *)pCont)->vgId = htonl(TD_VID(pVnode));

      tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
      tEncodeDeleteRes(pCoder, &res);
      tEncoderClear(pCoder);

      rpcFreeCont(pMsg->pCont);
      pMsg->pCont = pCont;
      pMsg->contLen = size + sizeof(SMsgHead);

      taosArrayDestroy(res.uidList);
    } break;
H
Hongze Cheng 已提交
127 128 129
    default:
      break;
  }
H
Hongze Cheng 已提交
130

S
Shengliang Guan 已提交
131
  return code;
H
Hongze Cheng 已提交
132 133 134 135

_err:
  vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
136 137
}

138
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
139 140 141 142
  void   *ptr = NULL;
  void   *pReq;
  int32_t len;
  int32_t ret;
H
Hongze Cheng 已提交
143

144
  vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
145
         version);
H
Hongze Cheng 已提交
146

H
Hongze Cheng 已提交
147
  pVnode->state.applied = version;
H
Hongze Cheng 已提交
148
  pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
H
Hongze Cheng 已提交
149

H
Hongze Cheng 已提交
150 151 152
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
H
Hongze Cheng 已提交
153 154

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
155
    /* META */
H
Hongze Cheng 已提交
156
    case TDMT_VND_CREATE_STB:
H
Hongze Cheng 已提交
157
      if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
158
      break;
H
Hongze Cheng 已提交
159
    case TDMT_VND_ALTER_STB:
H
Hongze Cheng 已提交
160
      if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
161
      break;
H
Hongze Cheng 已提交
162
    case TDMT_VND_DROP_STB:
H
Hongze Cheng 已提交
163
      if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
164
      break;
H
Hongze Cheng 已提交
165
    case TDMT_VND_CREATE_TABLE:
H
Hongze Cheng 已提交
166
      if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
167 168
      break;
    case TDMT_VND_ALTER_TABLE:
H
Hongze Cheng 已提交
169
      if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
170
      break;
H
Hongze Cheng 已提交
171
    case TDMT_VND_DROP_TABLE:
H
Hongze Cheng 已提交
172
      if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
173
      break;
174
    case TDMT_VND_DROP_TTL_TABLE:
175
      if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
176
      break;
S
Shengliang Guan 已提交
177 178 179 180
    case TDMT_VND_TRIM:
      if (vnodeProcessTrimReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
      break;
    case TDMT_VND_CREATE_SMA:
181
      if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
182
      break;
H
Hongze Cheng 已提交
183
    /* TSDB */
H
Hongze Cheng 已提交
184
    case TDMT_VND_SUBMIT:
H
Hongze Cheng 已提交
185
      if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
186
      break;
D
dapan1121 已提交
187
    case TDMT_VND_DELETE:
H
Hongze Cheng 已提交
188
      if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
D
dapan1121 已提交
189
      break;
H
Hongze Cheng 已提交
190
    /* TQ */
L
Liu Jicong 已提交
191 192 193
    case TDMT_VND_MQ_VG_CHANGE:
      if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                               pMsg->contLen - sizeof(SMsgHead)) < 0) {
194
        goto _err;
L
Liu Jicong 已提交
195 196
      }
      break;
L
Liu Jicong 已提交
197 198
    case TDMT_VND_MQ_VG_DELETE:
      if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
199 200 201 202 203 204 205
        goto _err;
      }
      break;
    case TDMT_VND_MQ_COMMIT_OFFSET:
      if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                   pMsg->contLen - sizeof(SMsgHead)) < 0) {
        goto _err;
L
Liu Jicong 已提交
206 207
      }
      break;
208
    case TDMT_STREAM_TASK_DEPLOY: {
L
Liu Jicong 已提交
209 210
      if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                 pMsg->contLen - sizeof(SMsgHead)) < 0) {
211
        goto _err;
H
Hongze Cheng 已提交
212 213
      }
    } break;
L
Liu Jicong 已提交
214
    case TDMT_STREAM_TASK_DROP: {
L
Liu Jicong 已提交
215 216 217 218
      if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
        goto _err;
      }
    } break;
219 220 221
    case TDMT_VND_ALTER_CONFIRM:
      vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
      break;
S
Shengliang Guan 已提交
222
    case TDMT_VND_ALTER_HASHRANGE:
223
      vnodeProcessAlterHashRangeReq(pVnode, version, pReq, len, pRsp);
S
Shengliang Guan 已提交
224
      break;
225
    case TDMT_VND_ALTER_CONFIG:
226
      vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
S
Shengliang Guan 已提交
227
      break;
H
Hongze Cheng 已提交
228 229
    case TDMT_VND_COMMIT:
      goto _do_commit;
H
Hongze Cheng 已提交
230 231 232 233 234
    default:
      ASSERT(0);
      break;
  }

235
  vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
H
Hongze Cheng 已提交
236

237
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
S
Shengliang Guan 已提交
238
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
239 240 241
    return -1;
  }

H
Hongze Cheng 已提交
242
  // commit if need
H
Hongze Cheng 已提交
243
  if (vnodeShouldCommit(pVnode)) {
H
Hongze Cheng 已提交
244
  _do_commit:
S
Shengliang Guan 已提交
245
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
H
Hongze Cheng 已提交
246 247 248 249 250
    // commit current change
    vnodeCommit(pVnode);

    // start a new one
    vnodeBegin(pVnode);
H
Hongze Cheng 已提交
251 252 253
  }

  return 0;
H
Hongze Cheng 已提交
254 255

_err:
S
Shengliang Guan 已提交
256
  vError("vgId:%d, process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
257 258
         tstrerror(terrno), version);
  return -1;
H
Hongze Cheng 已提交
259 260
}

261
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan1121 已提交
262
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
D
dapan1121 已提交
263 264 265 266 267 268 269
    return 0;
  }

  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg);
}

int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
270
  vTrace("message in vnode query queue is processing");
D
dapan1121 已提交
271
  if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
272 273 274 275
    vnodeRedirectRpcMsg(pVnode, pMsg);
    return 0;
  }

276
  SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Hongze Cheng 已提交
277
  switch (pMsg->msgType) {
D
dapan1121 已提交
278
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
279
    case TDMT_SCH_MERGE_QUERY:
D
dapan1121 已提交
280
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
281
    case TDMT_SCH_QUERY_CONTINUE:
D
dapan1121 已提交
282
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
283 284 285 286 287 288
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

289
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
H
Hongze Cheng 已提交
290
  vTrace("message in fetch queue is processing");
M
Minglei Jin 已提交
291 292 293
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META ||
       pMsg->msgType == TDMT_VND_TABLE_CFG) &&
      !vnodeIsLeader(pVnode)) {
294 295 296 297
    vnodeRedirectRpcMsg(pVnode, pMsg);
    return 0;
  }

H
Hongze Cheng 已提交
298 299
  char   *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
C
Cary Xu 已提交
300

H
Hongze Cheng 已提交
301
  switch (pMsg->msgType) {
D
dapan1121 已提交
302
    case TDMT_SCH_FETCH:
D
dapan1121 已提交
303
    case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
304
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
305
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
306
      return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
307
    case TDMT_SCH_CANCEL_TASK:
D
dapan1121 已提交
308
      return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
309
    case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
310
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
311
    case TDMT_SCH_QUERY_HEARTBEAT:
D
dapan1121 已提交
312
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
313 314
    case TDMT_VND_TABLE_META:
      return vnodeGetTableMeta(pVnode, pMsg);
D
dapan1121 已提交
315 316
    case TDMT_VND_TABLE_CFG:
      return vnodeGetTableCfg(pVnode, pMsg);
H
Hongze Cheng 已提交
317 318
    case TDMT_VND_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
319 320 321
    case TDMT_STREAM_TASK_RUN:
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
322
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
323
    case TDMT_STREAM_TASK_RECOVER:
L
Liu Jicong 已提交
324
      return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
325 326
    case TDMT_STREAM_RETRIEVE:
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
327
    case TDMT_STREAM_TASK_DISPATCH_RSP:
L
Liu Jicong 已提交
328
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
329
    case TDMT_STREAM_TASK_RECOVER_RSP:
L
Liu Jicong 已提交
330
      return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
331 332
    case TDMT_STREAM_RETRIEVE_RSP:
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
333 334 335 336 337 338 339 340 341 342
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
  // TODO

S
slzhou 已提交
343
  blockDebugShowDataBlocks(data, __func__);
344
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
345 346
}

D
dapan1121 已提交
347 348 349 350 351 352 353
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
  strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
  pMetaRsp->dbId = pVnode->config.dbId;
  pMetaRsp->vgId = TD_VID(pVnode);
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}

S
Shengliang Guan 已提交
354
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
355
  int32_t     code = 0;
S
Shengliang Guan 已提交
356 357 358
  SVTrimDbReq trimReq = {0};

  vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
H
Hongze Cheng 已提交
359 360 361 362 363

  // decode
  if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
S
Shengliang Guan 已提交
364 365
  }

H
Hongze Cheng 已提交
366 367 368 369 370 371
  // process
  code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
  if (code) goto _exit;

_exit:
  return code;
S
Shengliang Guan 已提交
372 373
}

L
Liu Jicong 已提交
374
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
375 376 377
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
  if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
378 379 380 381 382 383 384 385
  SVDropTtlTableReq ttlReq = {0};
  if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto end;
  }

  vInfo("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp);
  int32_t ret = metaTtlDropTable(pVnode->pMeta, ttlReq.timestamp, tbUids);
L
Liu Jicong 已提交
386
  if (ret != 0) {
387 388
    goto end;
  }
L
Liu Jicong 已提交
389
  if (taosArrayGetSize(tbUids) > 0) {
wmmhello's avatar
wmmhello 已提交
390 391
    tqUpdateTbUidList(pVnode->pTq, tbUids, false);
  }
392 393 394 395 396 397

end:
  taosArrayDestroy(tbUids);
  return ret;
}

398
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
399
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
400
  SDecoder       coder;
H
Hongze Cheng 已提交
401

H
Hongze Cheng 已提交
402 403 404 405 406 407
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
408
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
409 410

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
411 412
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
413 414
  }

H
Hongze Cheng 已提交
415
  if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
H
Hongze Cheng 已提交
416 417
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
418 419
  }

420
  if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) {
421 422 423
    pRsp->code = terrno;
    goto _err;
  }
C
Cary Xu 已提交
424

M
Minglei Jin 已提交
425 426
  // taosMemoryFree(req.schemaRow.pSchema);
  // taosMemoryFree(req.schemaTag.pSchema);
H
Hongze Cheng 已提交
427
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
428
  return 0;
H
Hongze Cheng 已提交
429 430

_err:
M
Minglei Jin 已提交
431 432
  taosMemoryFree(req.schemaRow.pSchema);
  taosMemoryFree(req.schemaTag.pSchema);
H
Hongze Cheng 已提交
433
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
434
  return -1;
H
Hongze Cheng 已提交
435 436
}

437
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
438
  SDecoder           decoder = {0};
439
  int32_t            rcode = 0;
H
Hongze Cheng 已提交
440 441
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
442 443 444
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
445
  STbUidStore       *pStore = NULL;
446
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
447 448

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
449 450 451
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
452

H
Hongze Cheng 已提交
453
  // decode
H
Hongze Cheng 已提交
454 455
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
456 457 458 459
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
460

H
Hongze Cheng 已提交
461
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
462 463
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
464 465 466 467 468
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
469
  // loop to create table
470
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
471
    pCreateReq = req.pReqs + iReq;
H
Hongze Cheng 已提交
472 473 474 475 476 477 478 479 480 481

    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
H
Hongze Cheng 已提交
482
    if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
H
Hongze Cheng 已提交
483 484 485 486 487
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
488
    } else {
H
Hongze Cheng 已提交
489
      cRsp.code = TSDB_CODE_SUCCESS;
490
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
491
      taosArrayPush(tbUids, &pCreateReq->uid);
H
Hongze Cheng 已提交
492
    }
H
Hongze Cheng 已提交
493 494

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
495 496
  }

497
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
498 499
  tdUpdateTbUidList(pVnode->pSma, pStore);
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
500

H
Hongze Cheng 已提交
501
  // prepare rsp
H
Hongze Cheng 已提交
502 503
  SEncoder encoder = {0};
  int32_t  ret = 0;
wafwerar's avatar
wafwerar 已提交
504
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
505 506 507 508 509 510
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
511 512
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
H
Hongze Cheng 已提交
513

H
Hongze Cheng 已提交
514
_exit:
wmmhello's avatar
wmmhello 已提交
515 516 517 518
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
    taosArrayDestroy(pCreateReq->ctb.tagName);
  }
H
Hongze Cheng 已提交
519
  taosArrayDestroy(rsp.pArray);
520
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
521 522
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
523
  return rcode;
H
Hongze Cheng 已提交
524 525
}

526
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
542
  }
H
Hongze Cheng 已提交
543 544 545 546 547 548 549 550 551

  if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
552 553 554
  return 0;
}

555
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
556
  SVDropStbReq req = {0};
557
  int32_t      rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
558
  SDecoder     decoder = {0};
H
Hongze Cheng 已提交
559 560 561 562 563 564

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
565 566
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
567 568 569 570 571
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
572 573 574 575
  if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
576

577 578 579 580 581
  if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
582 583 584
  // return rsp
_exit:
  pRsp->code = rcode;
H
Hongze Cheng 已提交
585
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
586 587 588
  return 0;
}

589
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
D
dapan1121 已提交
590 591 592
  SVAlterTbReq  vAlterTbReq = {0};
  SVAlterTbRsp  vAlterTbRsp = {0};
  SDecoder      dc = {0};
593 594
  int32_t       rcode = 0;
  int32_t       ret;
D
dapan1121 已提交
595 596
  SEncoder      ec = {0};
  STableMetaRsp vMetaRsp = {0};
H
Hongze Cheng 已提交
597 598 599 600 601 602 603 604 605 606

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
607
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
608
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
609 610
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
611 612 613
  }

  // process
D
dapan1121 已提交
614
  if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq, &vMetaRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
615
    vAlterTbRsp.code = terrno;
H
Hongze Cheng 已提交
616
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
617 618
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
619 620
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
621

D
dapan1121 已提交
622 623 624 625 626
  if (NULL != vMetaRsp.pSchemas) {
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
    vAlterTbRsp.pMeta = &vMetaRsp;
  }

H
Hongze Cheng 已提交
627 628 629 630 631 632
_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
M
Minglei Jin 已提交
633 634 635
  if (vMetaRsp.pSchemas) {
    taosMemoryFree(vMetaRsp.pSchemas);
  }
H
Hongze Cheng 已提交
636 637 638
  return 0;
}

639
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
640 641
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
642
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
643
  SEncoder         encoder = {0};
644
  int32_t          ret;
645
  SArray          *tbUids = NULL;
H
Hongze Cheng 已提交
646

H
Hongze Cheng 已提交
647
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
648 649 650
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
651 652

  // decode req
H
Hongze Cheng 已提交
653 654
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
655 656 657 658 659
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
660 661

  // process req
662
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
663
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
664 665
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

666
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
667 668
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
H
Hongze Cheng 已提交
669

H
Hongze Cheng 已提交
670
    /* code */
671
    ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
H
Hongze Cheng 已提交
672
    if (ret < 0) {
H
Hongze Cheng 已提交
673 674 675 676 677
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
678
    } else {
H
Hongze Cheng 已提交
679
      dropTbRsp.code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
680 681 682 683 684
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

685 686
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);

H
Hongze Cheng 已提交
687
_exit:
688
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
689
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
690 691 692 693 694
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
695
  taosArrayDestroy(rsp.pArray);
H
Hongze Cheng 已提交
696 697 698
  return 0;
}

699 700
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
                                              const char *tags) {
D
dapan 已提交
701 702 703 704
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;
C
Cary Xu 已提交
705
  int32_t        rv = -1;
D
dapan 已提交
706 707 708

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
C
Cary Xu 已提交
709
  if (!pSchema || (suid != msgIter->suid) || rv != TD_ROW_SVER(blkIter.row)) {
D
dapan 已提交
710 711 712
    if (pSchema) {
      taosMemoryFreeClear(pSchema);
    }
C
Cary Xu 已提交
713
    pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row));  // TODO: use the real schema
D
dapan 已提交
714 715
    if (pSchema) {
      suid = msgIter->suid;
C
Cary Xu 已提交
716
      rv = TD_ROW_SVER(blkIter.row);
D
dapan 已提交
717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733
    }
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

734
static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
C
Cary Xu 已提交
735 736 737 738 739 740 741 742 743
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SMeta         *pMeta = pVnode->pMeta;
  SSubmitBlk    *pBlock = NULL;

  if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
  while (true) {
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
    if (pBlock == NULL) break;
H
Hongze Cheng 已提交
744

D
dapan 已提交
745 746
    vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
  }
C
Cary Xu 已提交
747 748 749 750

  return 0;
}

751
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
752
  SSubmitReq    *pSubmitReq = (SSubmitReq *)pReq;
H
Hongze Cheng 已提交
753
  SSubmitRsp     submitRsp = {0};
H
Hongze Cheng 已提交
754 755 756 757
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock;
  SSubmitRsp     rsp = {0};
  SVCreateTbReq  createTbReq = {0};
H
Hongze Cheng 已提交
758
  SDecoder       decoder = {0};
H
Hongze Cheng 已提交
759
  int32_t        nRows;
H
Hongze Cheng 已提交
760 761
  int32_t        tsize, ret;
  SEncoder       encoder = {0};
762
  SArray        *newTbUids = NULL;
763
  terrno = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
764 765

  pRsp->code = 0;
C
Cary Xu 已提交
766

C
Cary Xu 已提交
767 768 769 770
#ifdef TD_DEBUG_PRINT_ROW
  vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif

C
Cary Xu 已提交
771 772 773 774 775
  if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
    pRsp->code = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
776
  // handle the request
H
Hongze Cheng 已提交
777 778 779
  if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
    pRsp->code = TSDB_CODE_INVALID_MSG;
    goto _exit;
H
Hongze Cheng 已提交
780 781
  }

C
Cary Xu 已提交
782 783
  submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
  newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
784 785 786 787 788
  if (!submitRsp.pArray) {
    pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
789
  for (;;) {
H
Hongze Cheng 已提交
790 791 792
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

H
Hongze Cheng 已提交
793 794
    SSubmitBlkRsp submitBlkRsp = {0};

H
Hongze Cheng 已提交
795 796
    // create table for auto create table mode
    if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
797 798
      submitBlkRsp.hashMeta = 1;

H
Hongze Cheng 已提交
799 800
      tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
      if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
H
Hongze Cheng 已提交
801
        pRsp->code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
802
        tDecoderClear(&decoder);
wmmhello's avatar
wmmhello 已提交
803
        taosArrayDestroy(createTbReq.ctb.tagName);
H
Hongze Cheng 已提交
804 805 806 807 808
        goto _exit;
      }

      if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
H
Hongze Cheng 已提交
809
          submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
810
          tDecoderClear(&decoder);
wmmhello's avatar
wmmhello 已提交
811
          taosArrayDestroy(createTbReq.ctb.tagName);
H
Hongze Cheng 已提交
812 813 814
          goto _exit;
        }
      }
815
      taosArrayPush(newTbUids, &createTbReq.uid);
H
Hongze Cheng 已提交
816

H
Hongze Cheng 已提交
817
      submitBlkRsp.uid = createTbReq.uid;
D
dapan 已提交
818
      submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
D
dapan 已提交
819
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
820

H
Hongze Cheng 已提交
821 822 823 824 825 826 827
      msgIter.uid = createTbReq.uid;
      if (createTbReq.type == TSDB_CHILD_TABLE) {
        msgIter.suid = createTbReq.ctb.suid;
      } else {
        msgIter.suid = 0;
      }

wmmhello's avatar
wmmhello 已提交
828
#ifdef TD_DEBUG_PRINT_ROW
D
dapan 已提交
829
      vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
wmmhello's avatar
wmmhello 已提交
830
#endif
H
Hongze Cheng 已提交
831
      tDecoderClear(&decoder);
wmmhello's avatar
wmmhello 已提交
832
      taosArrayDestroy(createTbReq.ctb.tagName);
D
dapan1121 已提交
833 834 835
    } else {
      submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
836 837
    }

H
Hongze Cheng 已提交
838
    if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
H
Hongze Cheng 已提交
839
      submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
840 841
    }

H
Hongze Cheng 已提交
842 843 844
    submitRsp.numOfRows += submitBlkRsp.numOfRows;
    submitRsp.affectedRows += submitBlkRsp.affectedRows;
    taosArrayPush(submitRsp.pArray, &submitBlkRsp);
H
Hongze Cheng 已提交
845
  }
846
  tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
847

H
Hongze Cheng 已提交
848
_exit:
849
  taosArrayDestroy(newTbUids);
H
Hongze Cheng 已提交
850 851 852 853 854 855 856 857
  tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
  pRsp->pCont = rpcMallocCont(tsize);
  pRsp->contLen = tsize;
  tEncoderInit(&encoder, pRsp->pCont, tsize);
  tEncodeSSubmitRsp(&encoder, &submitRsp);
  tEncoderClear(&encoder);

  for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
D
dapan 已提交
858
    taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
H
Hongze Cheng 已提交
859 860 861
  }

  taosArrayDestroy(submitRsp.pArray);
H
Hongze Cheng 已提交
862

863
  // TODO: the partial success scenario and the error case
H
Hongze Cheng 已提交
864 865
  // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
  // 1/level 2.
866
  // TODO: refactor
C
Cary Xu 已提交
867
  if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
L
Liu Jicong 已提交
868
    tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
869
  }
C
Cary Xu 已提交
870

H
Hongze Cheng 已提交
871
  return 0;
L
Liu Jicong 已提交
872
}
873

874
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
875
  SVCreateTSmaReq req = {0};
H
Hongze Cheng 已提交
876
  SDecoder        coder;
877

C
Cary Xu 已提交
878 879 880 881 882 883
  if (pRsp) {
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
    pRsp->code = TSDB_CODE_SUCCESS;
    pRsp->pCont = NULL;
    pRsp->contLen = 0;
  }
884 885 886 887 888

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
C
Cary Xu 已提交
889 890
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
    if (pRsp) pRsp->code = terrno;
891
    goto _err;
892
  }
C
Cary Xu 已提交
893

C
Cary Xu 已提交
894
  if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
C
Cary Xu 已提交
895
    if (pRsp) pRsp->code = terrno;
896
    goto _err;
897
  }
C
Cary Xu 已提交
898

899
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
900
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
C
Cary Xu 已提交
901
         req.indexName, req.indexUid, version, req.tableUid);
H
Hongze Cheng 已提交
902
  return 0;
903 904 905

_err:
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
906
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
C
Cary Xu 已提交
907
         TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr());
908
  return -1;
L
Liu Jicong 已提交
909
}
C
Cary Xu 已提交
910 911 912 913 914 915 916 917 918 919 920 921

/**
 * @brief specific for smaDstVnode
 *
 * @param pVnode
 * @param pCont
 * @param contLen
 * @return int32_t
 */
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
922 923 924 925 926 927 928 929 930

static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  vInfo("vgId:%d, alter replica confim msg is processed", TD_VID(pVnode));
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  return 0;
931
}
S
Shengliang Guan 已提交
932

933
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
934 935
  vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));

936 937
  // todo
  // 1. stop work
S
Shengliang Guan 已提交
938 939 940
  // 2. adjust hash range / compact / remove wals / rename vgroups
  // 3. reload sync
  return 0;
M
Minghao Li 已提交
941
}
H
Hongze Cheng 已提交
942

943 944 945 946 947 948 949 950 951
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SAlterVnodeReq alterReq = {0};
  if (tDeserializeSAlterVnodeReq(pReq, len, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return TSDB_CODE_INVALID_MSG;
  }

  vInfo("vgId:%d, start to alter vnode config, cacheLast:%d cacheLastSize:%d", TD_VID(pVnode), alterReq.cacheLast,
        alterReq.cacheLastSize);
952 953 954 955 956
  if (pVnode->config.cacheLastSize != alterReq.cacheLastSize) {
    pVnode->config.cacheLastSize = alterReq.cacheLastSize;
    // TODO: save config
    tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
  }
957 958 959
  return 0;
}

H
Hongze Cheng 已提交
960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  int32_t     code = 0;
  SDecoder   *pCoder = &(SDecoder){0};
  SDeleteRes *pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);

  for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
    code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
                               pRes->skey, pRes->ekey);
    if (code) goto _err;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pRes->uidList);
  return code;

_err:
  return code;
M
Minglei Jin 已提交
986
}