vnodeSvr.c 28.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

18 19 20 21 22 23 24 25 26
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
S
Shengliang Guan 已提交
27
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
28
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
29
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
30

31
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
32
  int32_t  code = 0;
H
Hongze Cheng 已提交
33
  SDecoder dc = {0};
H
Hongze Cheng 已提交
34

H
Hongze Cheng 已提交
35 36 37 38 39 40 41 42 43 44 45
  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      int64_t ctime = taosGetTimestampMs();
      int32_t nReqs;

      tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
      tStartDecode(&dc);

      tDecodeI32v(&dc, &nReqs);
      for (int32_t iReq = 0; iReq < nReqs; iReq++) {
        tb_uid_t uid = tGenIdPI64();
H
Hongze Cheng 已提交
46
        char    *name = NULL;
H
Hongze Cheng 已提交
47 48 49
        tStartDecode(&dc);

        tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
50
        tDecodeCStr(&dc, &name);
H
Hongze Cheng 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64
        *(int64_t *)(dc.data + dc.pos) = uid;
        *(int64_t *)(dc.data + dc.pos + 8) = ctime;

        tEndDecode(&dc);
      }

      tEndDecode(&dc);
      tDecoderClear(&dc);
    } break;
    case TDMT_VND_SUBMIT: {
      SSubmitMsgIter msgIter = {0};
      SSubmitReq    *pSubmitReq = (SSubmitReq *)pMsg->pCont;
      SSubmitBlk    *pBlock = NULL;
      int64_t        ctime = taosGetTimestampMs();
H
Hongze Cheng 已提交
65
      tb_uid_t       uid;
H
Hongze Cheng 已提交
66 67 68 69 70 71 72 73

      tInitSubmitMsgIter(pSubmitReq, &msgIter);

      for (;;) {
        tGetSubmitMsgNext(&msgIter, &pBlock);
        if (pBlock == NULL) break;

        if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
74
          char *name = NULL;
H
Hongze Cheng 已提交
75

H
Hongze Cheng 已提交
76 77 78 79
          tDecoderInit(&dc, pBlock->data, msgIter.schemaLen);
          tStartDecode(&dc);

          tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
80 81 82 83 84 85
          tDecodeCStr(&dc, &name);

          uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
          if (uid == 0) {
            uid = tGenIdPI64();
          }
H
Hongze Cheng 已提交
86
          *(int64_t *)(dc.data + dc.pos) = uid;
H
Hongze Cheng 已提交
87
          *(int64_t *)(dc.data + dc.pos + 8) = ctime;
H
Hongze Cheng 已提交
88
          pBlock->uid = htobe64(uid);
H
Hongze Cheng 已提交
89 90 91 92 93 94 95

          tEndDecode(&dc);
          tDecoderClear(&dc);
        }
      }

    } break;
H
Hongze Cheng 已提交
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
    case TDMT_VND_DELETE: {
      int32_t     size;
      int32_t     ret;
      uint8_t    *pCont;
      SEncoder   *pCoder = &(SEncoder){0};
      SDeleteRes  res = {0};
      SReadHandle handle = {
          .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};

      code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
      if (code) goto _err;

      // malloc and encode
      tEncodeSize(tEncodeDeleteRes, &res, size, ret);
      pCont = rpcMallocCont(size + sizeof(SMsgHead));

      ((SMsgHead *)pCont)->contLen = htonl(size + sizeof(SMsgHead));
      ((SMsgHead *)pCont)->vgId = htonl(TD_VID(pVnode));

      tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
      tEncodeDeleteRes(pCoder, &res);
      tEncoderClear(pCoder);

      rpcFreeCont(pMsg->pCont);
      pMsg->pCont = pCont;
      pMsg->contLen = size + sizeof(SMsgHead);

      taosArrayDestroy(res.uidList);
    } break;
H
Hongze Cheng 已提交
125 126 127
    default:
      break;
  }
H
Hongze Cheng 已提交
128

S
Shengliang Guan 已提交
129
  return code;
H
Hongze Cheng 已提交
130 131 132 133

_err:
  vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
134 135
}

136
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
137 138 139 140
  void   *ptr = NULL;
  void   *pReq;
  int32_t len;
  int32_t ret;
H
Hongze Cheng 已提交
141

142
  vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
143
         version);
H
Hongze Cheng 已提交
144

H
Hongze Cheng 已提交
145 146
  pVnode->state.applied = version;

H
Hongze Cheng 已提交
147 148 149
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
H
Hongze Cheng 已提交
150 151

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
152
    /* META */
H
Hongze Cheng 已提交
153
    case TDMT_VND_CREATE_STB:
H
Hongze Cheng 已提交
154
      if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
155
      break;
H
Hongze Cheng 已提交
156
    case TDMT_VND_ALTER_STB:
H
Hongze Cheng 已提交
157
      if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
158
      break;
H
Hongze Cheng 已提交
159
    case TDMT_VND_DROP_STB:
H
Hongze Cheng 已提交
160
      if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
161
      break;
H
Hongze Cheng 已提交
162
    case TDMT_VND_CREATE_TABLE:
H
Hongze Cheng 已提交
163
      if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
164 165
      break;
    case TDMT_VND_ALTER_TABLE:
H
Hongze Cheng 已提交
166
      if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
167
      break;
H
Hongze Cheng 已提交
168
    case TDMT_VND_DROP_TABLE:
H
Hongze Cheng 已提交
169
      if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
170
      break;
171
    case TDMT_VND_DROP_TTL_TABLE:
172
      if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
173
      break;
174 175
    case TDMT_VND_CREATE_SMA: {
      if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
176 177
    } break;
    /* TSDB */
H
Hongze Cheng 已提交
178
    case TDMT_VND_SUBMIT:
H
Hongze Cheng 已提交
179
      if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
180
      break;
D
dapan1121 已提交
181
    case TDMT_VND_DELETE:
H
Hongze Cheng 已提交
182
      if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
D
dapan1121 已提交
183
      break;
H
Hongze Cheng 已提交
184
    /* TQ */
L
Liu Jicong 已提交
185 186 187
    case TDMT_VND_MQ_VG_CHANGE:
      if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                               pMsg->contLen - sizeof(SMsgHead)) < 0) {
188
        goto _err;
L
Liu Jicong 已提交
189 190
      }
      break;
L
Liu Jicong 已提交
191 192
    case TDMT_VND_MQ_VG_DELETE:
      if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
193 194 195 196 197 198 199
        goto _err;
      }
      break;
    case TDMT_VND_MQ_COMMIT_OFFSET:
      if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                   pMsg->contLen - sizeof(SMsgHead)) < 0) {
        goto _err;
L
Liu Jicong 已提交
200 201
      }
      break;
202
    case TDMT_STREAM_TASK_DEPLOY: {
L
Liu Jicong 已提交
203 204
      if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                 pMsg->contLen - sizeof(SMsgHead)) < 0) {
205
        goto _err;
H
Hongze Cheng 已提交
206 207
      }
    } break;
L
Liu Jicong 已提交
208
    case TDMT_STREAM_TASK_DROP: {
L
Liu Jicong 已提交
209 210 211 212
      if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
        goto _err;
      }
    } break;
213 214 215
    case TDMT_VND_ALTER_CONFIRM:
      vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
      break;
S
Shengliang Guan 已提交
216 217 218
    case TDMT_VND_ALTER_HASHRANGE:
      vnodeProcessAlterHasnRangeReq(pVnode, version, pReq, len, pRsp);
      break;
219
    case TDMT_VND_ALTER_CONFIG:
S
Shengliang Guan 已提交
220
      break;
H
Hongze Cheng 已提交
221 222
    case TDMT_VND_COMMIT:
      goto _do_commit;
H
Hongze Cheng 已提交
223 224 225 226 227
    default:
      ASSERT(0);
      break;
  }

228
  vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
H
Hongze Cheng 已提交
229

230
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
S
Shengliang Guan 已提交
231
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
232 233 234
    return -1;
  }

H
Hongze Cheng 已提交
235
  // commit if need
H
Hongze Cheng 已提交
236
  if (vnodeShouldCommit(pVnode)) {
H
Hongze Cheng 已提交
237
  _do_commit:
S
Shengliang Guan 已提交
238
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
H
Hongze Cheng 已提交
239 240 241 242 243
    // commit current change
    vnodeCommit(pVnode);

    // start a new one
    vnodeBegin(pVnode);
H
Hongze Cheng 已提交
244 245 246
  }

  return 0;
H
Hongze Cheng 已提交
247 248

_err:
S
Shengliang Guan 已提交
249
  vError("vgId:%d, process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
250 251
         tstrerror(terrno), version);
  return -1;
H
Hongze Cheng 已提交
252 253
}

254
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan1121 已提交
255
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
D
dapan1121 已提交
256 257 258 259 260 261 262
    return 0;
  }

  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg);
}

int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
263
  vTrace("message in vnode query queue is processing");
D
dapan1121 已提交
264
  if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
265 266 267 268
    vnodeRedirectRpcMsg(pVnode, pMsg);
    return 0;
  }

269
  SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Hongze Cheng 已提交
270
  switch (pMsg->msgType) {
D
dapan1121 已提交
271
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
272
    case TDMT_SCH_MERGE_QUERY:
D
dapan1121 已提交
273
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
274
    case TDMT_SCH_QUERY_CONTINUE:
D
dapan1121 已提交
275
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
276 277 278 279 280 281
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

282
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
H
Hongze Cheng 已提交
283
  vTrace("message in fetch queue is processing");
M
Minglei Jin 已提交
284 285 286
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META ||
       pMsg->msgType == TDMT_VND_TABLE_CFG) &&
      !vnodeIsLeader(pVnode)) {
287 288 289 290
    vnodeRedirectRpcMsg(pVnode, pMsg);
    return 0;
  }

H
Hongze Cheng 已提交
291 292
  char   *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
C
Cary Xu 已提交
293

H
Hongze Cheng 已提交
294
  switch (pMsg->msgType) {
D
dapan1121 已提交
295
    case TDMT_SCH_FETCH:
D
dapan1121 已提交
296
    case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
297
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
298
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
299
      return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
300
    case TDMT_SCH_CANCEL_TASK:
D
dapan1121 已提交
301
      return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
302
    case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
303
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
304
    case TDMT_SCH_QUERY_HEARTBEAT:
D
dapan1121 已提交
305
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
306 307
    case TDMT_VND_TABLE_META:
      return vnodeGetTableMeta(pVnode, pMsg);
D
dapan1121 已提交
308 309
    case TDMT_VND_TABLE_CFG:
      return vnodeGetTableCfg(pVnode, pMsg);
H
Hongze Cheng 已提交
310 311
    case TDMT_VND_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
312 313 314
    case TDMT_STREAM_TASK_RUN:
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
315
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
316
    case TDMT_STREAM_TASK_RECOVER:
L
Liu Jicong 已提交
317
      return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
318 319
    case TDMT_STREAM_RETRIEVE:
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
320
    case TDMT_STREAM_TASK_DISPATCH_RSP:
L
Liu Jicong 已提交
321
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
322
    case TDMT_STREAM_TASK_RECOVER_RSP:
L
Liu Jicong 已提交
323
      return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
324 325
    case TDMT_STREAM_RETRIEVE_RSP:
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
326 327 328 329 330 331 332 333 334 335
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
  // TODO

S
slzhou 已提交
336
  blockDebugShowDataBlocks(data, __func__);
337
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
338 339
}

D
dapan1121 已提交
340 341 342 343 344 345 346
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
  strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
  pMetaRsp->dbId = pVnode->config.dbId;
  pMetaRsp->vgId = TD_VID(pVnode);
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}

L
Liu Jicong 已提交
347
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
348 349 350
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
  if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;

L
Liu Jicong 已提交
351
  int32_t t = ntohl(*(int32_t *)pReq);
wmmhello's avatar
wmmhello 已提交
352 353
  vError("rec ttl time:%d", t);
  int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids);
L
Liu Jicong 已提交
354
  if (ret != 0) {
355 356
    goto end;
  }
L
Liu Jicong 已提交
357
  if (taosArrayGetSize(tbUids) > 0) {
wmmhello's avatar
wmmhello 已提交
358 359
    tqUpdateTbUidList(pVnode->pTq, tbUids, false);
  }
360 361 362 363 364 365

end:
  taosArrayDestroy(tbUids);
  return ret;
}

366
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
367
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
368
  SDecoder       coder;
H
Hongze Cheng 已提交
369

H
Hongze Cheng 已提交
370 371 372 373 374 375
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
376
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
377 378

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
379 380
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
381 382
  }

H
Hongze Cheng 已提交
383
  if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
H
Hongze Cheng 已提交
384 385
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
386 387
  }

388 389 390 391
  if (tdProcessRSmaCreate(pVnode, &req) < 0) {
    pRsp->code = terrno;
    goto _err;
  }
C
Cary Xu 已提交
392

M
Minglei Jin 已提交
393 394
  taosMemoryFree(req.schemaRow.pSchema);
  taosMemoryFree(req.schemaTag.pSchema);
H
Hongze Cheng 已提交
395
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
396
  return 0;
H
Hongze Cheng 已提交
397 398

_err:
M
Minglei Jin 已提交
399 400
  taosMemoryFree(req.schemaRow.pSchema);
  taosMemoryFree(req.schemaTag.pSchema);
H
Hongze Cheng 已提交
401
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
402
  return -1;
H
Hongze Cheng 已提交
403 404
}

405
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
406
  SDecoder           decoder = {0};
407
  int32_t            rcode = 0;
H
Hongze Cheng 已提交
408 409
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
410 411 412
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
413
  STbUidStore       *pStore = NULL;
414
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
415 416

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
417 418 419
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
420

H
Hongze Cheng 已提交
421
  // decode
H
Hongze Cheng 已提交
422 423
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
424 425 426 427
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
428

H
Hongze Cheng 已提交
429
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
430 431
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
432 433 434 435 436
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
437
  // loop to create table
438
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
439
    pCreateReq = req.pReqs + iReq;
H
Hongze Cheng 已提交
440 441 442 443 444 445 446 447 448 449

    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
H
Hongze Cheng 已提交
450
    if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
H
Hongze Cheng 已提交
451 452 453 454 455
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
456
    } else {
H
Hongze Cheng 已提交
457
      cRsp.code = TSDB_CODE_SUCCESS;
458
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
459
      taosArrayPush(tbUids, &pCreateReq->uid);
H
Hongze Cheng 已提交
460
    }
H
Hongze Cheng 已提交
461 462

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
463 464
  }

H
Hongze Cheng 已提交
465
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
466

467
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
468 469
  tdUpdateTbUidList(pVnode->pSma, pStore);
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
470

H
Hongze Cheng 已提交
471
  // prepare rsp
H
Hongze Cheng 已提交
472 473
  SEncoder encoder = {0};
  int32_t  ret = 0;
wafwerar's avatar
wafwerar 已提交
474
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
475 476 477 478 479 480
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
481 482 483
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
484

H
Hongze Cheng 已提交
485
_exit:
H
Hongze Cheng 已提交
486
  taosArrayDestroy(rsp.pArray);
487
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
488 489
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
490
  return rcode;
H
Hongze Cheng 已提交
491 492
}

493
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
509
  }
H
Hongze Cheng 已提交
510 511 512 513 514 515 516 517 518

  if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
519 520 521
  return 0;
}

522
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
523
  SVDropStbReq req = {0};
524
  int32_t      rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
525
  SDecoder     decoder = {0};
H
Hongze Cheng 已提交
526 527 528 529 530 531

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
532 533
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
534 535 536 537 538
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
539 540 541 542
  if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
543 544 545 546

  // return rsp
_exit:
  pRsp->code = rcode;
H
Hongze Cheng 已提交
547
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
548 549 550
  return 0;
}

551
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
D
dapan1121 已提交
552 553 554
  SVAlterTbReq  vAlterTbReq = {0};
  SVAlterTbRsp  vAlterTbRsp = {0};
  SDecoder      dc = {0};
555 556
  int32_t       rcode = 0;
  int32_t       ret;
D
dapan1121 已提交
557 558
  SEncoder      ec = {0};
  STableMetaRsp vMetaRsp = {0};
H
Hongze Cheng 已提交
559 560 561 562 563 564 565 566 567 568

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
569
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
570
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
571 572
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
573 574 575
  }

  // process
D
dapan1121 已提交
576
  if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq, &vMetaRsp) < 0) {
H
Hongze Cheng 已提交
577
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
578
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
579 580
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
581 582
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
583

D
dapan1121 已提交
584 585 586 587 588
  if (NULL != vMetaRsp.pSchemas) {
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
    vAlterTbRsp.pMeta = &vMetaRsp;
  }

H
Hongze Cheng 已提交
589 590 591 592 593 594
_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
595 596 597
  return 0;
}

598
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
599 600
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
601
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
602
  SEncoder         encoder = {0};
603
  int32_t          ret;
604
  SArray          *tbUids = NULL;
H
Hongze Cheng 已提交
605

H
Hongze Cheng 已提交
606
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
607 608 609
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
610 611

  // decode req
H
Hongze Cheng 已提交
612 613
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
614 615 616 617 618
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
619 620

  // process req
621
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
622
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
623 624
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

625
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
626 627
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
H
Hongze Cheng 已提交
628

H
Hongze Cheng 已提交
629
    /* code */
630
    ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
H
Hongze Cheng 已提交
631
    if (ret < 0) {
H
Hongze Cheng 已提交
632 633 634 635 636
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
637
    } else {
H
Hongze Cheng 已提交
638
      dropTbRsp.code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
639 640 641 642 643
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

644 645
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);

H
Hongze Cheng 已提交
646
_exit:
647
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
648
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
649 650 651 652 653
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
654 655 656
  return 0;
}

657 658
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
                                              const char *tags) {
D
dapan 已提交
659 660 661 662
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;
C
Cary Xu 已提交
663
  int32_t        rv = -1;
D
dapan 已提交
664 665 666

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
C
Cary Xu 已提交
667
  if (!pSchema || (suid != msgIter->suid) || rv != TD_ROW_SVER(blkIter.row)) {
D
dapan 已提交
668 669 670
    if (pSchema) {
      taosMemoryFreeClear(pSchema);
    }
C
Cary Xu 已提交
671
    pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row));  // TODO: use the real schema
D
dapan 已提交
672 673
    if (pSchema) {
      suid = msgIter->suid;
C
Cary Xu 已提交
674
      rv = TD_ROW_SVER(blkIter.row);
D
dapan 已提交
675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691
    }
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

692
static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
C
Cary Xu 已提交
693 694 695 696 697 698 699 700 701
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SMeta         *pMeta = pVnode->pMeta;
  SSubmitBlk    *pBlock = NULL;

  if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
  while (true) {
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
    if (pBlock == NULL) break;
H
Hongze Cheng 已提交
702

D
dapan 已提交
703 704
    vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
  }
C
Cary Xu 已提交
705 706 707 708

  return 0;
}

709
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
710
  SSubmitReq    *pSubmitReq = (SSubmitReq *)pReq;
H
Hongze Cheng 已提交
711
  SSubmitRsp     submitRsp = {0};
H
Hongze Cheng 已提交
712 713 714 715
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock;
  SSubmitRsp     rsp = {0};
  SVCreateTbReq  createTbReq = {0};
H
Hongze Cheng 已提交
716
  SDecoder       decoder = {0};
H
Hongze Cheng 已提交
717
  int32_t        nRows;
H
Hongze Cheng 已提交
718 719
  int32_t        tsize, ret;
  SEncoder       encoder = {0};
720
  SArray        *newTbUids = NULL;
721
  terrno = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
722 723

  pRsp->code = 0;
C
Cary Xu 已提交
724

C
Cary Xu 已提交
725 726 727 728
#ifdef TD_DEBUG_PRINT_ROW
  vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif

C
Cary Xu 已提交
729 730 731 732 733
  if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
    pRsp->code = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
734
  // handle the request
H
Hongze Cheng 已提交
735 736 737
  if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
    pRsp->code = TSDB_CODE_INVALID_MSG;
    goto _exit;
H
Hongze Cheng 已提交
738 739
  }

C
Cary Xu 已提交
740 741
  submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
  newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
742 743 744 745 746
  if (!submitRsp.pArray) {
    pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
747
  for (;;) {
H
Hongze Cheng 已提交
748 749 750
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

H
Hongze Cheng 已提交
751 752
    SSubmitBlkRsp submitBlkRsp = {0};

H
Hongze Cheng 已提交
753 754
    // create table for auto create table mode
    if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
755 756
      submitBlkRsp.hashMeta = 1;

H
Hongze Cheng 已提交
757 758
      tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
      if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
H
Hongze Cheng 已提交
759
        pRsp->code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
760
        tDecoderClear(&decoder);
H
Hongze Cheng 已提交
761 762 763 764 765
        goto _exit;
      }

      if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
H
Hongze Cheng 已提交
766
          submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
767
          tDecoderClear(&decoder);
H
Hongze Cheng 已提交
768 769 770
          goto _exit;
        }
      }
771
      taosArrayPush(newTbUids, &createTbReq.uid);
H
Hongze Cheng 已提交
772

H
Hongze Cheng 已提交
773
      submitBlkRsp.uid = createTbReq.uid;
D
dapan 已提交
774
      submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
D
dapan 已提交
775
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
776

H
Hongze Cheng 已提交
777 778 779 780 781 782 783
      msgIter.uid = createTbReq.uid;
      if (createTbReq.type == TSDB_CHILD_TABLE) {
        msgIter.suid = createTbReq.ctb.suid;
      } else {
        msgIter.suid = 0;
      }

wmmhello's avatar
wmmhello 已提交
784
#ifdef TD_DEBUG_PRINT_ROW
D
dapan 已提交
785
      vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
wmmhello's avatar
wmmhello 已提交
786
#endif
H
Hongze Cheng 已提交
787
      tDecoderClear(&decoder);
D
dapan1121 已提交
788 789 790
    } else {
      submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
791 792
    }

H
Hongze Cheng 已提交
793
    if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
H
Hongze Cheng 已提交
794
      submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
795 796
    }

H
Hongze Cheng 已提交
797 798 799
    submitRsp.numOfRows += submitBlkRsp.numOfRows;
    submitRsp.affectedRows += submitBlkRsp.affectedRows;
    taosArrayPush(submitRsp.pArray, &submitBlkRsp);
H
Hongze Cheng 已提交
800
  }
801
  tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
802

H
Hongze Cheng 已提交
803
_exit:
804
  taosArrayDestroy(newTbUids);
H
Hongze Cheng 已提交
805 806 807 808 809 810 811 812
  tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
  pRsp->pCont = rpcMallocCont(tsize);
  pRsp->contLen = tsize;
  tEncoderInit(&encoder, pRsp->pCont, tsize);
  tEncodeSSubmitRsp(&encoder, &submitRsp);
  tEncoderClear(&encoder);

  for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
D
dapan 已提交
813
    taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
H
Hongze Cheng 已提交
814 815 816
  }

  taosArrayDestroy(submitRsp.pArray);
H
Hongze Cheng 已提交
817

818
  // TODO: the partial success scenario and the error case
M
Minglei Jin 已提交
819 820
  // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
  // 1/level 2.
821
  // TODO: refactor
C
Cary Xu 已提交
822
  if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
L
Liu Jicong 已提交
823
    tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
824
  }
C
Cary Xu 已提交
825

H
Hongze Cheng 已提交
826
  return 0;
L
Liu Jicong 已提交
827
}
828

829
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
830
  SVCreateTSmaReq req = {0};
H
Hongze Cheng 已提交
831
  SDecoder        coder;
832

C
Cary Xu 已提交
833 834 835 836 837 838
  if (pRsp) {
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
    pRsp->code = TSDB_CODE_SUCCESS;
    pRsp->pCont = NULL;
    pRsp->contLen = 0;
  }
839 840 841 842 843

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
C
Cary Xu 已提交
844 845
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
    if (pRsp) pRsp->code = terrno;
846
    goto _err;
847
  }
C
Cary Xu 已提交
848

C
Cary Xu 已提交
849
  if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
C
Cary Xu 已提交
850
    if (pRsp) pRsp->code = terrno;
851
    goto _err;
852
  }
C
Cary Xu 已提交
853

854
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
855
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
C
Cary Xu 已提交
856
         req.indexName, req.indexUid, version, req.tableUid);
H
Hongze Cheng 已提交
857
  return 0;
858 859 860

_err:
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
861
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
C
Cary Xu 已提交
862
         TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr(terrno));
863
  return -1;
L
Liu Jicong 已提交
864
}
C
Cary Xu 已提交
865 866 867 868 869 870 871 872 873 874 875 876

/**
 * @brief specific for smaDstVnode
 *
 * @param pVnode
 * @param pCont
 * @param contLen
 * @return int32_t
 */
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
877 878 879 880 881 882 883 884 885

static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  vInfo("vgId:%d, alter replica confim msg is processed", TD_VID(pVnode));
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  return 0;
886
}
S
Shengliang Guan 已提交
887 888 889 890

static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));

891 892
  // todo
  // 1. stop work
S
Shengliang Guan 已提交
893 894 895
  // 2. adjust hash range / compact / remove wals / rename vgroups
  // 3. reload sync
  return 0;
M
Minghao Li 已提交
896
}
H
Hongze Cheng 已提交
897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923

static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  int32_t     code = 0;
  SDecoder   *pCoder = &(SDecoder){0};
  SDeleteRes *pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);

  for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
    code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
                               pRes->skey, pRes->ekey);
    if (code) goto _err;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pRes->uidList);
  return code;

_err:
  return code;
M
Minglei Jin 已提交
924
}