vnodeSvr.c 30.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

18 19 20 21 22 23 24 25 26
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
27 28
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
29
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
S
Shengliang Guan 已提交
30
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
31
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
32

33
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
34
  int32_t  code = 0;
H
Hongze Cheng 已提交
35
  SDecoder dc = {0};
H
Hongze Cheng 已提交
36

H
Hongze Cheng 已提交
37 38 39 40 41 42 43 44 45 46 47
  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      int64_t ctime = taosGetTimestampMs();
      int32_t nReqs;

      tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
      tStartDecode(&dc);

      tDecodeI32v(&dc, &nReqs);
      for (int32_t iReq = 0; iReq < nReqs; iReq++) {
        tb_uid_t uid = tGenIdPI64();
H
Hongze Cheng 已提交
48
        char    *name = NULL;
H
Hongze Cheng 已提交
49 50 51
        tStartDecode(&dc);

        tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
52
        tDecodeCStr(&dc, &name);
H
Hongze Cheng 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66
        *(int64_t *)(dc.data + dc.pos) = uid;
        *(int64_t *)(dc.data + dc.pos + 8) = ctime;

        tEndDecode(&dc);
      }

      tEndDecode(&dc);
      tDecoderClear(&dc);
    } break;
    case TDMT_VND_SUBMIT: {
      SSubmitMsgIter msgIter = {0};
      SSubmitReq    *pSubmitReq = (SSubmitReq *)pMsg->pCont;
      SSubmitBlk    *pBlock = NULL;
      int64_t        ctime = taosGetTimestampMs();
H
Hongze Cheng 已提交
67
      tb_uid_t       uid;
H
Hongze Cheng 已提交
68 69 70 71 72 73 74 75

      tInitSubmitMsgIter(pSubmitReq, &msgIter);

      for (;;) {
        tGetSubmitMsgNext(&msgIter, &pBlock);
        if (pBlock == NULL) break;

        if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
76
          char *name = NULL;
H
Hongze Cheng 已提交
77

H
Hongze Cheng 已提交
78 79 80 81
          tDecoderInit(&dc, pBlock->data, msgIter.schemaLen);
          tStartDecode(&dc);

          tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
82 83 84 85 86 87
          tDecodeCStr(&dc, &name);

          uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
          if (uid == 0) {
            uid = tGenIdPI64();
          }
H
Hongze Cheng 已提交
88
          *(int64_t *)(dc.data + dc.pos) = uid;
H
Hongze Cheng 已提交
89
          *(int64_t *)(dc.data + dc.pos + 8) = ctime;
H
Hongze Cheng 已提交
90
          pBlock->uid = htobe64(uid);
H
Hongze Cheng 已提交
91 92 93 94 95 96 97

          tEndDecode(&dc);
          tDecoderClear(&dc);
        }
      }

    } break;
H
Hongze Cheng 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
    case TDMT_VND_DELETE: {
      int32_t     size;
      int32_t     ret;
      uint8_t    *pCont;
      SEncoder   *pCoder = &(SEncoder){0};
      SDeleteRes  res = {0};
      SReadHandle handle = {
          .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};

      code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
      if (code) goto _err;

      // malloc and encode
      tEncodeSize(tEncodeDeleteRes, &res, size, ret);
      pCont = rpcMallocCont(size + sizeof(SMsgHead));

      ((SMsgHead *)pCont)->contLen = htonl(size + sizeof(SMsgHead));
      ((SMsgHead *)pCont)->vgId = htonl(TD_VID(pVnode));

      tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
      tEncodeDeleteRes(pCoder, &res);
      tEncoderClear(pCoder);

      rpcFreeCont(pMsg->pCont);
      pMsg->pCont = pCont;
      pMsg->contLen = size + sizeof(SMsgHead);

      taosArrayDestroy(res.uidList);
    } break;
H
Hongze Cheng 已提交
127 128 129
    default:
      break;
  }
H
Hongze Cheng 已提交
130

S
Shengliang Guan 已提交
131
  return code;
H
Hongze Cheng 已提交
132 133 134 135

_err:
  vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
136 137
}

138
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
139 140 141 142
  void   *ptr = NULL;
  void   *pReq;
  int32_t len;
  int32_t ret;
H
Hongze Cheng 已提交
143

144
  vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
145
         version);
H
Hongze Cheng 已提交
146

H
Hongze Cheng 已提交
147
  pVnode->state.applied = version;
H
Hongze Cheng 已提交
148
  pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
H
Hongze Cheng 已提交
149

H
Hongze Cheng 已提交
150 151 152
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
H
Hongze Cheng 已提交
153 154

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
155
    /* META */
H
Hongze Cheng 已提交
156
    case TDMT_VND_CREATE_STB:
H
Hongze Cheng 已提交
157
      if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
158
      break;
H
Hongze Cheng 已提交
159
    case TDMT_VND_ALTER_STB:
H
Hongze Cheng 已提交
160
      if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
161
      break;
H
Hongze Cheng 已提交
162
    case TDMT_VND_DROP_STB:
H
Hongze Cheng 已提交
163
      if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
164
      break;
H
Hongze Cheng 已提交
165
    case TDMT_VND_CREATE_TABLE:
H
Hongze Cheng 已提交
166
      if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
167 168
      break;
    case TDMT_VND_ALTER_TABLE:
H
Hongze Cheng 已提交
169
      if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
170
      break;
H
Hongze Cheng 已提交
171
    case TDMT_VND_DROP_TABLE:
H
Hongze Cheng 已提交
172
      if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
173
      break;
174
    case TDMT_VND_DROP_TTL_TABLE:
175
      if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
176
      break;
S
Shengliang Guan 已提交
177 178 179 180
    case TDMT_VND_TRIM:
      if (vnodeProcessTrimReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
      break;
    case TDMT_VND_CREATE_SMA:
181
      if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
182
      break;
H
Hongze Cheng 已提交
183
    /* TSDB */
H
Hongze Cheng 已提交
184
    case TDMT_VND_SUBMIT:
H
Hongze Cheng 已提交
185
      if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
186
      break;
D
dapan1121 已提交
187
    case TDMT_VND_DELETE:
H
Hongze Cheng 已提交
188
      if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
D
dapan1121 已提交
189
      break;
H
Hongze Cheng 已提交
190
    /* TQ */
L
Liu Jicong 已提交
191 192 193
    case TDMT_VND_MQ_VG_CHANGE:
      if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                               pMsg->contLen - sizeof(SMsgHead)) < 0) {
194
        goto _err;
L
Liu Jicong 已提交
195 196
      }
      break;
L
Liu Jicong 已提交
197 198
    case TDMT_VND_MQ_VG_DELETE:
      if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
199 200 201 202 203 204 205
        goto _err;
      }
      break;
    case TDMT_VND_MQ_COMMIT_OFFSET:
      if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                   pMsg->contLen - sizeof(SMsgHead)) < 0) {
        goto _err;
L
Liu Jicong 已提交
206 207
      }
      break;
208
    case TDMT_STREAM_TASK_DEPLOY: {
L
Liu Jicong 已提交
209 210
      if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                 pMsg->contLen - sizeof(SMsgHead)) < 0) {
211
        goto _err;
H
Hongze Cheng 已提交
212 213
      }
    } break;
L
Liu Jicong 已提交
214
    case TDMT_STREAM_TASK_DROP: {
L
Liu Jicong 已提交
215 216 217 218
      if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
        goto _err;
      }
    } break;
219 220 221
    case TDMT_VND_ALTER_CONFIRM:
      vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
      break;
S
Shengliang Guan 已提交
222
    case TDMT_VND_ALTER_HASHRANGE:
223
      vnodeProcessAlterHashRangeReq(pVnode, version, pReq, len, pRsp);
S
Shengliang Guan 已提交
224
      break;
225
    case TDMT_VND_ALTER_CONFIG:
226
      vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
S
Shengliang Guan 已提交
227
      break;
H
Hongze Cheng 已提交
228 229
    case TDMT_VND_COMMIT:
      goto _do_commit;
H
Hongze Cheng 已提交
230 231 232 233 234
    default:
      ASSERT(0);
      break;
  }

235
  vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
H
Hongze Cheng 已提交
236

237
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
S
Shengliang Guan 已提交
238
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
239 240 241
    return -1;
  }

H
Hongze Cheng 已提交
242
  // commit if need
H
Hongze Cheng 已提交
243
  if (vnodeShouldCommit(pVnode)) {
H
Hongze Cheng 已提交
244
  _do_commit:
S
Shengliang Guan 已提交
245
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
H
Hongze Cheng 已提交
246 247 248 249 250
    // commit current change
    vnodeCommit(pVnode);

    // start a new one
    vnodeBegin(pVnode);
H
Hongze Cheng 已提交
251 252 253
  }

  return 0;
H
Hongze Cheng 已提交
254 255

_err:
S
Shengliang Guan 已提交
256
  vError("vgId:%d, process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
257 258
         tstrerror(terrno), version);
  return -1;
H
Hongze Cheng 已提交
259 260
}

261
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan1121 已提交
262
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
D
dapan1121 已提交
263 264 265 266 267 268 269
    return 0;
  }

  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg);
}

int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
270
  vTrace("message in vnode query queue is processing");
D
dapan1121 已提交
271
  if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
272 273 274 275
    vnodeRedirectRpcMsg(pVnode, pMsg);
    return 0;
  }

276
  SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Hongze Cheng 已提交
277
  switch (pMsg->msgType) {
D
dapan1121 已提交
278
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
279
    case TDMT_SCH_MERGE_QUERY:
D
dapan1121 已提交
280
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
281
    case TDMT_SCH_QUERY_CONTINUE:
D
dapan1121 已提交
282
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
283 284 285 286 287 288
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

289
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
H
Hongze Cheng 已提交
290
  vTrace("message in fetch queue is processing");
M
Minglei Jin 已提交
291 292 293
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META ||
       pMsg->msgType == TDMT_VND_TABLE_CFG) &&
      !vnodeIsLeader(pVnode)) {
294 295 296 297
    vnodeRedirectRpcMsg(pVnode, pMsg);
    return 0;
  }

H
Hongze Cheng 已提交
298 299
  char   *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
C
Cary Xu 已提交
300

H
Hongze Cheng 已提交
301
  switch (pMsg->msgType) {
D
dapan1121 已提交
302
    case TDMT_SCH_FETCH:
D
dapan1121 已提交
303
    case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
304
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
305
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
306
      return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
307
    case TDMT_SCH_CANCEL_TASK:
D
dapan1121 已提交
308
      return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
309
    case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
310
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
311
    case TDMT_SCH_QUERY_HEARTBEAT:
D
dapan1121 已提交
312
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
313 314
    case TDMT_VND_TABLE_META:
      return vnodeGetTableMeta(pVnode, pMsg);
D
dapan1121 已提交
315 316
    case TDMT_VND_TABLE_CFG:
      return vnodeGetTableCfg(pVnode, pMsg);
H
Hongze Cheng 已提交
317 318
    case TDMT_VND_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
319 320 321
    case TDMT_STREAM_TASK_RUN:
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
322
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
323
    case TDMT_STREAM_TASK_RECOVER:
L
Liu Jicong 已提交
324
      return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
325 326
    case TDMT_STREAM_RETRIEVE:
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
327
    case TDMT_STREAM_TASK_DISPATCH_RSP:
L
Liu Jicong 已提交
328
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
329
    case TDMT_STREAM_TASK_RECOVER_RSP:
L
Liu Jicong 已提交
330
      return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
331 332
    case TDMT_STREAM_RETRIEVE_RSP:
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
333 334 335 336 337 338 339 340 341 342
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
  // TODO

S
slzhou 已提交
343
  blockDebugShowDataBlocks(data, __func__);
344
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
345 346
}

D
dapan1121 已提交
347 348 349 350 351 352 353
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
  strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
  pMetaRsp->dbId = pVnode->config.dbId;
  pMetaRsp->vgId = TD_VID(pVnode);
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}

S
Shengliang Guan 已提交
354
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
355
  int32_t     code = 0;
S
Shengliang Guan 已提交
356 357 358
  SVTrimDbReq trimReq = {0};

  vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
H
Hongze Cheng 已提交
359 360 361 362 363

  // decode
  if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
S
Shengliang Guan 已提交
364 365
  }

H
Hongze Cheng 已提交
366 367 368 369 370 371
  // process
  code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
  if (code) goto _exit;

_exit:
  return code;
S
Shengliang Guan 已提交
372 373
}

L
Liu Jicong 已提交
374
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
375 376 377
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
  if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
378 379 380 381 382 383 384 385
  SVDropTtlTableReq ttlReq = {0};
  if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto end;
  }

  vInfo("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp);
  int32_t ret = metaTtlDropTable(pVnode->pMeta, ttlReq.timestamp, tbUids);
L
Liu Jicong 已提交
386
  if (ret != 0) {
387 388
    goto end;
  }
L
Liu Jicong 已提交
389
  if (taosArrayGetSize(tbUids) > 0) {
wmmhello's avatar
wmmhello 已提交
390 391
    tqUpdateTbUidList(pVnode->pTq, tbUids, false);
  }
392 393 394 395 396 397

end:
  taosArrayDestroy(tbUids);
  return ret;
}

398
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
399
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
400
  SDecoder       coder;
H
Hongze Cheng 已提交
401

H
Hongze Cheng 已提交
402 403 404 405 406 407
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
408
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
409 410

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
411 412
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
413 414
  }

H
Hongze Cheng 已提交
415
  if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
H
Hongze Cheng 已提交
416 417
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
418 419
  }

420
  if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) {
421 422 423
    pRsp->code = terrno;
    goto _err;
  }
C
Cary Xu 已提交
424

M
Minglei Jin 已提交
425 426
  taosMemoryFree(req.schemaRow.pSchema);
  taosMemoryFree(req.schemaTag.pSchema);
H
Hongze Cheng 已提交
427
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
428
  return 0;
H
Hongze Cheng 已提交
429 430

_err:
M
Minglei Jin 已提交
431 432
  taosMemoryFree(req.schemaRow.pSchema);
  taosMemoryFree(req.schemaTag.pSchema);
H
Hongze Cheng 已提交
433
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
434
  return -1;
H
Hongze Cheng 已提交
435 436
}

437
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
438
  SDecoder           decoder = {0};
439
  int32_t            rcode = 0;
H
Hongze Cheng 已提交
440 441
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
442 443 444
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
445
  STbUidStore       *pStore = NULL;
446
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
447 448

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
449 450 451
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
452

H
Hongze Cheng 已提交
453
  // decode
H
Hongze Cheng 已提交
454 455
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
456 457 458 459
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
460

H
Hongze Cheng 已提交
461
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
462 463
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
464 465 466 467 468
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
469
  // loop to create table
470
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
471
    pCreateReq = req.pReqs + iReq;
H
Hongze Cheng 已提交
472 473 474 475 476 477 478 479 480 481

    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
H
Hongze Cheng 已提交
482
    if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
H
Hongze Cheng 已提交
483 484 485 486 487
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
488
    } else {
H
Hongze Cheng 已提交
489
      cRsp.code = TSDB_CODE_SUCCESS;
490
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
491
      taosArrayPush(tbUids, &pCreateReq->uid);
H
Hongze Cheng 已提交
492
    }
H
Hongze Cheng 已提交
493 494

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
495 496
  }

H
Hongze Cheng 已提交
497
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
498

499
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
500 501
  tdUpdateTbUidList(pVnode->pSma, pStore);
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
502

H
Hongze Cheng 已提交
503
  // prepare rsp
H
Hongze Cheng 已提交
504 505
  SEncoder encoder = {0};
  int32_t  ret = 0;
wafwerar's avatar
wafwerar 已提交
506
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
507 508 509 510 511 512
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
513 514 515
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
516

H
Hongze Cheng 已提交
517
_exit:
H
Hongze Cheng 已提交
518
  taosArrayDestroy(rsp.pArray);
519
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
520 521
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
522
  return rcode;
H
Hongze Cheng 已提交
523 524
}

525
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
541
  }
H
Hongze Cheng 已提交
542 543 544 545 546 547 548 549 550

  if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
551 552 553
  return 0;
}

554
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
555
  SVDropStbReq req = {0};
556
  int32_t      rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
557
  SDecoder     decoder = {0};
H
Hongze Cheng 已提交
558 559 560 561 562 563

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
564 565
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
566 567 568 569 570
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
571 572 573 574
  if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
575

576 577 578 579 580
  if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
581 582 583
  // return rsp
_exit:
  pRsp->code = rcode;
H
Hongze Cheng 已提交
584
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
585 586 587
  return 0;
}

588
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
D
dapan1121 已提交
589 590 591
  SVAlterTbReq  vAlterTbReq = {0};
  SVAlterTbRsp  vAlterTbRsp = {0};
  SDecoder      dc = {0};
592 593
  int32_t       rcode = 0;
  int32_t       ret;
D
dapan1121 已提交
594 595
  SEncoder      ec = {0};
  STableMetaRsp vMetaRsp = {0};
H
Hongze Cheng 已提交
596 597 598 599 600 601 602 603 604 605

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
606
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
607
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
608 609
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
610 611 612
  }

  // process
D
dapan1121 已提交
613
  if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq, &vMetaRsp) < 0) {
H
Hongze Cheng 已提交
614
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
615
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
616 617
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
618 619
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
620

D
dapan1121 已提交
621 622 623 624 625
  if (NULL != vMetaRsp.pSchemas) {
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
    vAlterTbRsp.pMeta = &vMetaRsp;
  }

H
Hongze Cheng 已提交
626 627 628 629 630 631
_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
632 633 634
  return 0;
}

635
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
636 637
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
638
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
639
  SEncoder         encoder = {0};
640
  int32_t          ret;
641
  SArray          *tbUids = NULL;
H
Hongze Cheng 已提交
642

H
Hongze Cheng 已提交
643
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
644 645 646
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
647 648

  // decode req
H
Hongze Cheng 已提交
649 650
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
651 652 653 654 655
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
656 657

  // process req
658
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
659
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
660 661
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

662
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
663 664
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
H
Hongze Cheng 已提交
665

H
Hongze Cheng 已提交
666
    /* code */
667
    ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
H
Hongze Cheng 已提交
668
    if (ret < 0) {
H
Hongze Cheng 已提交
669 670 671 672 673
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
674
    } else {
H
Hongze Cheng 已提交
675
      dropTbRsp.code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
676 677 678 679 680
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

681 682
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);

H
Hongze Cheng 已提交
683
_exit:
684
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
685
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
686 687 688 689 690
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
691 692 693
  return 0;
}

694 695
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
                                              const char *tags) {
D
dapan 已提交
696 697 698 699
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;
C
Cary Xu 已提交
700
  int32_t        rv = -1;
D
dapan 已提交
701 702 703

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
C
Cary Xu 已提交
704
  if (!pSchema || (suid != msgIter->suid) || rv != TD_ROW_SVER(blkIter.row)) {
D
dapan 已提交
705 706 707
    if (pSchema) {
      taosMemoryFreeClear(pSchema);
    }
C
Cary Xu 已提交
708
    pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row));  // TODO: use the real schema
D
dapan 已提交
709 710
    if (pSchema) {
      suid = msgIter->suid;
C
Cary Xu 已提交
711
      rv = TD_ROW_SVER(blkIter.row);
D
dapan 已提交
712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728
    }
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

729
static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
C
Cary Xu 已提交
730 731 732 733 734 735 736 737 738
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SMeta         *pMeta = pVnode->pMeta;
  SSubmitBlk    *pBlock = NULL;

  if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
  while (true) {
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
    if (pBlock == NULL) break;
H
Hongze Cheng 已提交
739

D
dapan 已提交
740 741
    vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
  }
C
Cary Xu 已提交
742 743 744 745

  return 0;
}

746
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
747
  SSubmitReq    *pSubmitReq = (SSubmitReq *)pReq;
H
Hongze Cheng 已提交
748
  SSubmitRsp     submitRsp = {0};
H
Hongze Cheng 已提交
749 750 751 752
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock;
  SSubmitRsp     rsp = {0};
  SVCreateTbReq  createTbReq = {0};
H
Hongze Cheng 已提交
753
  SDecoder       decoder = {0};
H
Hongze Cheng 已提交
754
  int32_t        nRows;
H
Hongze Cheng 已提交
755 756
  int32_t        tsize, ret;
  SEncoder       encoder = {0};
757
  SArray        *newTbUids = NULL;
758
  terrno = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
759 760

  pRsp->code = 0;
C
Cary Xu 已提交
761

C
Cary Xu 已提交
762 763 764 765
#ifdef TD_DEBUG_PRINT_ROW
  vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif

C
Cary Xu 已提交
766 767 768 769 770
  if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
    pRsp->code = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
771
  // handle the request
H
Hongze Cheng 已提交
772 773 774
  if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
    pRsp->code = TSDB_CODE_INVALID_MSG;
    goto _exit;
H
Hongze Cheng 已提交
775 776
  }

C
Cary Xu 已提交
777 778
  submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
  newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
779 780 781 782 783
  if (!submitRsp.pArray) {
    pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
784
  for (;;) {
H
Hongze Cheng 已提交
785 786 787
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

H
Hongze Cheng 已提交
788 789
    SSubmitBlkRsp submitBlkRsp = {0};

H
Hongze Cheng 已提交
790 791
    // create table for auto create table mode
    if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
792 793
      submitBlkRsp.hashMeta = 1;

H
Hongze Cheng 已提交
794 795
      tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
      if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
H
Hongze Cheng 已提交
796
        pRsp->code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
797
        tDecoderClear(&decoder);
H
Hongze Cheng 已提交
798 799 800 801 802
        goto _exit;
      }

      if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
H
Hongze Cheng 已提交
803
          submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
804
          tDecoderClear(&decoder);
H
Hongze Cheng 已提交
805 806 807
          goto _exit;
        }
      }
808
      taosArrayPush(newTbUids, &createTbReq.uid);
H
Hongze Cheng 已提交
809

H
Hongze Cheng 已提交
810
      submitBlkRsp.uid = createTbReq.uid;
D
dapan 已提交
811
      submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
D
dapan 已提交
812
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
813

H
Hongze Cheng 已提交
814 815 816 817 818 819 820
      msgIter.uid = createTbReq.uid;
      if (createTbReq.type == TSDB_CHILD_TABLE) {
        msgIter.suid = createTbReq.ctb.suid;
      } else {
        msgIter.suid = 0;
      }

wmmhello's avatar
wmmhello 已提交
821
#ifdef TD_DEBUG_PRINT_ROW
D
dapan 已提交
822
      vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
wmmhello's avatar
wmmhello 已提交
823
#endif
H
Hongze Cheng 已提交
824
      tDecoderClear(&decoder);
D
dapan1121 已提交
825 826 827
    } else {
      submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
828 829
    }

H
Hongze Cheng 已提交
830
    if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
H
Hongze Cheng 已提交
831
      submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
832 833
    }

H
Hongze Cheng 已提交
834 835 836
    submitRsp.numOfRows += submitBlkRsp.numOfRows;
    submitRsp.affectedRows += submitBlkRsp.affectedRows;
    taosArrayPush(submitRsp.pArray, &submitBlkRsp);
H
Hongze Cheng 已提交
837
  }
838
  tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
839

H
Hongze Cheng 已提交
840
_exit:
841
  taosArrayDestroy(newTbUids);
H
Hongze Cheng 已提交
842 843 844 845 846 847 848 849
  tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
  pRsp->pCont = rpcMallocCont(tsize);
  pRsp->contLen = tsize;
  tEncoderInit(&encoder, pRsp->pCont, tsize);
  tEncodeSSubmitRsp(&encoder, &submitRsp);
  tEncoderClear(&encoder);

  for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
D
dapan 已提交
850
    taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
H
Hongze Cheng 已提交
851 852 853
  }

  taosArrayDestroy(submitRsp.pArray);
H
Hongze Cheng 已提交
854

855
  // TODO: the partial success scenario and the error case
H
Hongze Cheng 已提交
856 857
  // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
  // 1/level 2.
858
  // TODO: refactor
C
Cary Xu 已提交
859
  if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
L
Liu Jicong 已提交
860
    tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
861
  }
C
Cary Xu 已提交
862

H
Hongze Cheng 已提交
863
  return 0;
L
Liu Jicong 已提交
864
}
865

866
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
867
  SVCreateTSmaReq req = {0};
H
Hongze Cheng 已提交
868
  SDecoder        coder;
869

C
Cary Xu 已提交
870 871 872 873 874 875
  if (pRsp) {
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
    pRsp->code = TSDB_CODE_SUCCESS;
    pRsp->pCont = NULL;
    pRsp->contLen = 0;
  }
876 877 878 879 880

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
C
Cary Xu 已提交
881 882
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
    if (pRsp) pRsp->code = terrno;
883
    goto _err;
884
  }
C
Cary Xu 已提交
885

C
Cary Xu 已提交
886
  if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
C
Cary Xu 已提交
887
    if (pRsp) pRsp->code = terrno;
888
    goto _err;
889
  }
C
Cary Xu 已提交
890

891
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
892
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
C
Cary Xu 已提交
893
         req.indexName, req.indexUid, version, req.tableUid);
H
Hongze Cheng 已提交
894
  return 0;
895 896 897

_err:
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
898
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
C
Cary Xu 已提交
899
         TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr(terrno));
900
  return -1;
L
Liu Jicong 已提交
901
}
C
Cary Xu 已提交
902 903 904 905 906 907 908 909 910 911 912 913

/**
 * @brief specific for smaDstVnode
 *
 * @param pVnode
 * @param pCont
 * @param contLen
 * @return int32_t
 */
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
914 915 916 917 918 919 920 921 922

static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  vInfo("vgId:%d, alter replica confim msg is processed", TD_VID(pVnode));
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  return 0;
923
}
S
Shengliang Guan 已提交
924

925
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
926 927
  vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));

928 929
  // todo
  // 1. stop work
S
Shengliang Guan 已提交
930 931 932
  // 2. adjust hash range / compact / remove wals / rename vgroups
  // 3. reload sync
  return 0;
M
Minghao Li 已提交
933
}
H
Hongze Cheng 已提交
934

935 936 937 938 939 940 941 942 943
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SAlterVnodeReq alterReq = {0};
  if (tDeserializeSAlterVnodeReq(pReq, len, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return TSDB_CODE_INVALID_MSG;
  }

  vInfo("vgId:%d, start to alter vnode config, cacheLast:%d cacheLastSize:%d", TD_VID(pVnode), alterReq.cacheLast,
        alterReq.cacheLastSize);
944 945 946 947 948
  if (pVnode->config.cacheLastSize != alterReq.cacheLastSize) {
    pVnode->config.cacheLastSize = alterReq.cacheLastSize;
    // TODO: save config
    tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
  }
949 950 951
  return 0;
}

H
Hongze Cheng 已提交
952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  int32_t     code = 0;
  SDecoder   *pCoder = &(SDecoder){0};
  SDeleteRes *pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);

  for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
    code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
                               pRes->skey, pRes->ekey);
    if (code) goto _err;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pRes->uidList);
  return code;

_err:
  return code;
M
Minglei Jin 已提交
978
}