vnodeSvr.c 54.2 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16 17
#include "tencode.h"
#include "tmsg.h"
H
Hongze Cheng 已提交
18
#include "vnd.h"
H
Hongze Cheng 已提交
19 20
#include "vnode.h"
#include "vnodeInt.h"
H
Hongze Cheng 已提交
21

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
39

H
Hongze Cheng 已提交
40
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t ctime, int64_t *pUid) {
H
Hongze Cheng 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
  int32_t code = 0;
  int32_t lino = 0;

  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // flags
  if (tDecodeI32v(pCoder, NULL) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // name
  char *name = NULL;
  if (tDecodeCStr(pCoder, &name) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // uid
  int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
  if (uid == 0) {
    uid = tGenIdPI64();
  }
  *(int64_t *)(pCoder->data + pCoder->pos) = uid;

  // ctime
  *(int64_t *)(pCoder->data + pCoder->pos + 8) = ctime;

  tEndDecode(pCoder);

_exit:
  if (code) {
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  } else {
    vTrace("vgId:%d %s done, table:%s uid generated:%" PRId64, TD_VID(pVnode), __func__, name, uid);
H
Hongze Cheng 已提交
79
    if (pUid) *pUid = uid;
H
Hongze Cheng 已提交
80 81 82
  }
  return code;
}
H
Hongze Cheng 已提交
83 84 85 86 87
static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;
  int32_t lino = 0;

  int64_t  ctime = taosGetTimestampMs();
H
Hongze Cheng 已提交
88
  SDecoder dc = {0};
H
Hongze Cheng 已提交
89
  int32_t  nReqs;
H
Hongze Cheng 已提交
90

H
Hongze Cheng 已提交
91 92 93 94 95 96 97 98 99 100 101
  tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
  if (tStartDecode(&dc) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    return code;
  }

  if (tDecodeI32v(&dc, &nReqs) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  for (int32_t iReq = 0; iReq < nReqs; iReq++) {
H
Hongze Cheng 已提交
102
    code = vnodePreprocessCreateTableReq(pVnode, &dc, ctime, NULL);
H
Hongze Cheng 已提交
103
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
104 105 106 107 108 109 110 111
  }

  tEndDecode(&dc);

_exit:
  tDecoderClear(&dc);
  return code;
}
H
Hongze Cheng 已提交
112 113
extern int64_t tsMaxKeyByPrecision[];
static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t ctime) {
H
Hongze Cheng 已提交
114 115 116
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
117 118 119 120
  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
121

H
Hongze Cheng 已提交
122 123 124 125 126
  SSubmitTbData submitTbData;
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
127

H
Hongze Cheng 已提交
128
  int64_t uid;
H
Hongze Cheng 已提交
129
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
H
Hongze Cheng 已提交
130
    code = vnodePreprocessCreateTableReq(pVnode, pCoder, ctime, &uid);
H
Hongze Cheng 已提交
131 132 133 134 135 136 137 138
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // submit data
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
139 140 141

  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
    *(int64_t *)(pCoder->data + pCoder->pos) = uid;
H
Hongze Cheng 已提交
142
    pCoder->pos += sizeof(int64_t);
H
Hongze Cheng 已提交
143
  } else {
H
Hongze Cheng 已提交
144 145 146 147
    if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
      code = TSDB_CODE_INVALID_MSG;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
148
  }
H
Hongze Cheng 已提交
149

H
Hongze Cheng 已提交
150
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
H
Hongze Cheng 已提交
151 152 153 154
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
155 156 157 158 159 160 161 162 163 164 165 166
  // scan and check
  TSKEY now = ctime;
  if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) {
    now *= 1000;
  } else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) {
    now *= 1000000;
  }
  TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
  TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    uint64_t nColData;
    if (tDecodeU64v(pCoder, &nColData) < 0) {
H
Hongze Cheng 已提交
167
      code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
168
      goto _exit;
H
Hongze Cheng 已提交
169 170
    }

H
Hongze Cheng 已提交
171 172 173
    SColData colData = {0};
    pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);

H
Hongze Cheng 已提交
174 175 176 177 178
    if (colData.flag != HAS_VALUE) {
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }

H
Hongze Cheng 已提交
179 180 181 182 183 184 185 186 187
    for (int32_t iRow = 0; iRow < colData.nVal; iRow++) {
      if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) {
        code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
        goto _exit;
      }
    }
  } else {
    uint64_t nRow;
    if (tDecodeU64v(pCoder, &nRow) < 0) {
H
Hongze Cheng 已提交
188
      code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
189
      goto _exit;
H
Hongze Cheng 已提交
190
    }
H
Hongze Cheng 已提交
191

H
Hongze Cheng 已提交
192 193 194
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
      pCoder->pos += pRow->len;
H
Hongze Cheng 已提交
195

H
Hongze Cheng 已提交
196 197 198
      if (pRow->ts < minKey || pRow->ts > maxKey) {
        code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
        goto _exit;
H
Hongze Cheng 已提交
199
      }
H
Hongze Cheng 已提交
200 201
    }
  }
H
Hongze Cheng 已提交
202

H
Hongze Cheng 已提交
203
  tEndDecode(pCoder);
H
Hongze Cheng 已提交
204

H
Hongze Cheng 已提交
205
_exit:
H
Hongze Cheng 已提交
206
  return code;
H
Hongze Cheng 已提交
207 208 209 210
}
static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
211

H
Hongze Cheng 已提交
212
  SDecoder *pCoder = &(SDecoder){0};
H
Hongze Cheng 已提交
213

H
Hongze Cheng 已提交
214
  if (taosHton64(((SSubmitReq2Msg *)pMsg->pCont)->version) != 1) {
H
Hongze Cheng 已提交
215 216 217 218
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

X
Xiaoyu Wang 已提交
219
  tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SSubmitReq2Msg), pMsg->contLen - sizeof(SSubmitReq2Msg));
H
Hongze Cheng 已提交
220

H
Hongze Cheng 已提交
221 222 223 224
  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
225

H
Hongze Cheng 已提交
226 227 228 229 230
  uint64_t nSubmitTbData;
  if (tDecodeU64v(pCoder, &nSubmitTbData) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
231

H
Hongze Cheng 已提交
232 233 234 235
  int64_t ctime = taosGetTimestampMs();
  for (int32_t i = 0; i < nSubmitTbData; i++) {
    code = vnodePreProcessSubmitTbData(pVnode, pCoder, ctime);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
236
  }
H
Hongze Cheng 已提交
237

H
Hongze Cheng 已提交
238
  tEndDecode(pCoder);
H
Hongze Cheng 已提交
239

H
Hongze Cheng 已提交
240
_exit:
241
  if (code) {
242
    vError("vgId:%d, failed to preprocess submit request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code),
243 244
           pMsg->msgType);
  }
H
Hongze Cheng 已提交
245
  tDecoderClear(pCoder);
H
Hongze Cheng 已提交
246 247
  return code;
}
H
Hongze Cheng 已提交
248

H
Hongze Cheng 已提交
249 250 251
static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;

252 253 254 255 256
  int32_t    size;
  int32_t    ret;
  uint8_t   *pCont;
  SEncoder  *pCoder = &(SEncoder){0};
  SDeleteRes res = {0};
H
Haojun Liao 已提交
257

258
  SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Haojun Liao 已提交
259
  initStorageAPI(&handle.api);
H
Hongze Cheng 已提交
260 261 262 263 264 265 266

  code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
  if (code) goto _exit;

  // malloc and encode
  tEncodeSize(tEncodeDeleteRes, &res, size, ret);
  pCont = rpcMallocCont(size + sizeof(SMsgHead));
H
Hongze Cheng 已提交
267

H
Hongze Cheng 已提交
268 269
  ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
  ((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
H
Hongze Cheng 已提交
270

H
Hongze Cheng 已提交
271 272 273
  tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
  tEncodeDeleteRes(pCoder, &res);
  tEncoderClear(pCoder);
H
Hongze Cheng 已提交
274

H
Hongze Cheng 已提交
275 276 277
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = pCont;
  pMsg->contLen = size + sizeof(SMsgHead);
H
Hongze Cheng 已提交
278

H
Hongze Cheng 已提交
279 280 281 282 283
  taosArrayDestroy(res.uidList);

_exit:
  return code;
}
H
Hongze Cheng 已提交
284

H
Hongze Cheng 已提交
285 286 287 288 289 290 291 292 293 294 295 296
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;

  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      code = vnodePreProcessCreateTableMsg(pVnode, pMsg);
    } break;
    case TDMT_VND_SUBMIT: {
      code = vnodePreProcessSubmitMsg(pVnode, pMsg);
    } break;
    case TDMT_VND_DELETE: {
      code = vnodePreProcessDeleteMsg(pVnode, pMsg);
H
Hongze Cheng 已提交
297
    } break;
H
Hongze Cheng 已提交
298 299 300
    default:
      break;
  }
H
Hongze Cheng 已提交
301

H
Hongze Cheng 已提交
302 303
_exit:
  if (code) {
304
    vError("vgId:%d, failed to preprocess write request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code),
H
Hongze Cheng 已提交
305 306
           pMsg->msgType);
  }
H
Hongze Cheng 已提交
307
  return code;
H
Hongze Cheng 已提交
308 309
}

310
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg *pRsp) {
311 312 313 314
  void   *ptr = NULL;
  void   *pReq;
  int32_t len;
  int32_t ret;
315

316 317
  if (ver <= pVnode->state.applied) {
    vError("vgId:%d, duplicate write request. ver: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), ver,
318
           pVnode->state.applied);
319
    terrno = TSDB_CODE_VND_DUP_REQUEST;
320 321 322
    return -1;
  }

323
  vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver);
H
Hongze Cheng 已提交
324

325
  ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
326
  ASSERT(pVnode->state.applied + 1 == ver);
327

328
  atomic_store_64(&pVnode->state.applied, ver);
329
  atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
H
Hongze Cheng 已提交
330

331 332
  if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;

L
Liu Jicong 已提交
333
  if (pMsg->msgType == TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE || pMsg->msgType == TDMT_STREAM_TASK_CHECK_RSP) {
334
    if (tqCheckLogInWal(pVnode->pTq, ver)) return 0;
L
Liu Jicong 已提交
335 336
  }

H
Hongze Cheng 已提交
337 338 339
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
B
Benguang Zhao 已提交
340
  bool needCommit = false;
H
Hongze Cheng 已提交
341 342

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
343
    /* META */
H
Hongze Cheng 已提交
344
    case TDMT_VND_CREATE_STB:
345
      if (vnodeProcessCreateStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
346
      break;
H
Hongze Cheng 已提交
347
    case TDMT_VND_ALTER_STB:
348
      if (vnodeProcessAlterStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
349
      break;
H
Hongze Cheng 已提交
350
    case TDMT_VND_DROP_STB:
351
      if (vnodeProcessDropStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
352
      break;
H
Hongze Cheng 已提交
353
    case TDMT_VND_CREATE_TABLE:
354
      if (vnodeProcessCreateTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
355 356
      break;
    case TDMT_VND_ALTER_TABLE:
357
      if (vnodeProcessAlterTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
358
      break;
H
Hongze Cheng 已提交
359
    case TDMT_VND_DROP_TABLE:
360
      if (vnodeProcessDropTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
361
      break;
362
    case TDMT_VND_DROP_TTL_TABLE:
363
      if (vnodeProcessDropTtlTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
364
      break;
S
Shengliang Guan 已提交
365
    case TDMT_VND_TRIM:
366
      if (vnodeProcessTrimReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
367 368
      break;
    case TDMT_VND_CREATE_SMA:
369
      if (vnodeProcessCreateTSmaReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
370
      break;
H
Hongze Cheng 已提交
371
    /* TSDB */
H
Hongze Cheng 已提交
372
    case TDMT_VND_SUBMIT:
373
      if (vnodeProcessSubmitReq(pVnode, ver, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
374
      break;
D
dapan1121 已提交
375
    case TDMT_VND_DELETE:
376
      if (vnodeProcessDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
D
dapan1121 已提交
377
      break;
378
    case TDMT_VND_BATCH_DEL:
379
      if (vnodeProcessBatchDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
380
      break;
H
Hongze Cheng 已提交
381
    /* TQ */
L
Liu Jicong 已提交
382
    case TDMT_VND_TMQ_SUBSCRIBE:
383
      if (tqProcessSubscribeReq(pVnode->pTq, ver, pReq, len) < 0) {
384
        goto _err;
L
Liu Jicong 已提交
385 386
      }
      break;
L
Liu Jicong 已提交
387
    case TDMT_VND_TMQ_DELETE_SUB:
388
      if (tqProcessDeleteSubReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
389 390 391
        goto _err;
      }
      break;
L
Liu Jicong 已提交
392
    case TDMT_VND_TMQ_COMMIT_OFFSET:
393
      if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
394
        goto _err;
L
Liu Jicong 已提交
395 396
      }
      break;
397
    case TDMT_VND_TMQ_SEEK_TO_OFFSET:
398
      if (tqProcessSeekReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
399 400 401
        goto _err;
      }
      break;
L
Liu Jicong 已提交
402
    case TDMT_VND_TMQ_ADD_CHECKINFO:
403
      if (tqProcessAddCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
404 405 406
        goto _err;
      }
      break;
L
Liu Jicong 已提交
407
    case TDMT_VND_TMQ_DEL_CHECKINFO:
408
      if (tqProcessDelCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
L
Liu Jicong 已提交
409 410 411
        goto _err;
      }
      break;
412
    case TDMT_STREAM_TASK_DEPLOY: {
413
      if (pVnode->restored && tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) {
414
        goto _err;
H
Hongze Cheng 已提交
415 416
      }
    } break;
L
Liu Jicong 已提交
417
    case TDMT_STREAM_TASK_DROP: {
418
      if (tqProcessTaskDropReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
L
Liu Jicong 已提交
419 420 421
        goto _err;
      }
    } break;
5
54liuyao 已提交
422
    case TDMT_STREAM_TASK_PAUSE: {
423
      if (pVnode->restored && tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
5
54liuyao 已提交
424 425 426 427
        goto _err;
      }
    } break;
    case TDMT_STREAM_TASK_RESUME: {
428
      if (pVnode->restored && tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
5
54liuyao 已提交
429 430 431
        goto _err;
      }
    } break;
L
Liu Jicong 已提交
432
    case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: {
433
      if (tqProcessTaskRecover2Req(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
L
Liu Jicong 已提交
434 435 436
        goto _err;
      }
    } break;
437
    case TDMT_STREAM_TASK_CHECK_RSP: {
438
      if (tqProcessStreamTaskCheckRsp(pVnode->pTq, ver, pReq, len) < 0) {
439 440 441
        goto _err;
      }
    } break;
442
    case TDMT_VND_ALTER_CONFIRM:
443
      needCommit = pVnode->config.hashChange;
444
      if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
445 446
        goto _err;
      }
447
      break;
448
    case TDMT_VND_ALTER_CONFIG:
449
      vnodeProcessAlterConfigReq(pVnode, ver, pReq, len, pRsp);
S
Shengliang Guan 已提交
450
      break;
H
Hongze Cheng 已提交
451
    case TDMT_VND_COMMIT:
B
Benguang Zhao 已提交
452 453
      needCommit = true;
      break;
dengyihao's avatar
dengyihao 已提交
454
    case TDMT_VND_CREATE_INDEX:
455
      vnodeProcessCreateIndexReq(pVnode, ver, pReq, len, pRsp);
dengyihao's avatar
dengyihao 已提交
456 457
      break;
    case TDMT_VND_DROP_INDEX:
458
      vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
dengyihao's avatar
dengyihao 已提交
459
      break;
H
Hongze Cheng 已提交
460
    case TDMT_VND_COMPACT:
461
      vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp);
H
Hongze Cheng 已提交
462
      goto _exit;
H
Hongze Cheng 已提交
463
    default:
L
Liu Jicong 已提交
464 465
      vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
      return -1;
H
Hongze Cheng 已提交
466 467
  }

S
Shengliang Guan 已提交
468
  vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code,
469
         ver);
H
Hongze Cheng 已提交
470

471
  walApplyVer(pVnode->pWal, ver);
472

473
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) {
S
Shengliang Guan 已提交
474
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
475 476 477
    return -1;
  }

H
Hongze Cheng 已提交
478
  // commit if need
B
Benguang Zhao 已提交
479
  if (needCommit) {
480
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), ver);
481 482 483 484
    if (vnodeAsyncCommit(pVnode) < 0) {
      vError("vgId:%d, failed to vnode async commit since %s.", TD_VID(pVnode), tstrerror(terrno));
      goto _err;
    }
H
Hongze Cheng 已提交
485 486

    // start a new one
487
    if (vnodeBegin(pVnode) < 0) {
H
Hongze Cheng 已提交
488 489
      vError("vgId:%d, failed to begin vnode since %s.", TD_VID(pVnode), tstrerror(terrno));
      goto _err;
490
    }
H
Hongze Cheng 已提交
491 492
  }

H
Hongze Cheng 已提交
493
_exit:
H
Hongze Cheng 已提交
494
  return 0;
H
Hongze Cheng 已提交
495 496

_err:
497 498
  vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
         tstrerror(terrno), ver);
H
Hongze Cheng 已提交
499
  return -1;
H
Hongze Cheng 已提交
500 501
}

502
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan1121 已提交
503
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
D
dapan1121 已提交
504 505 506
    return 0;
  }

507
  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg, TDMT_SCH_QUERY == pMsg->msgType);
D
dapan1121 已提交
508 509 510
}

int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
511
  vTrace("message in vnode query queue is processing");
512
  if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && !syncIsReadyForRead(pVnode->sync)) {
513
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
514 515 516
    return 0;
  }

517 518 519 520 521
  if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) {
    vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
    return 0;
  }

522
  SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
523 524
  initStorageAPI(&handle.api);

H
Hongze Cheng 已提交
525
  switch (pMsg->msgType) {
D
dapan1121 已提交
526
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
527
    case TDMT_SCH_MERGE_QUERY:
D
dapan1121 已提交
528
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
529
    case TDMT_SCH_QUERY_CONTINUE:
D
dapan1121 已提交
530
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
531 532
    case TDMT_VND_TMQ_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg);
533 534
    case TDMT_VND_TMQ_CONSUME_PUSH:
      return tqProcessPollPush(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
535 536
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
537
      return TSDB_CODE_APP_ERROR;
H
Hongze Cheng 已提交
538 539 540
  }
}

541
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
S
Shengliang Guan 已提交
542
  vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
L
Liu Jicong 已提交
543
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
544
       pMsg->msgType == TDMT_VND_BATCH_META) &&
545
      !syncIsReadyForRead(pVnode->sync)) {
546
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
547 548 549
    return 0;
  }

H
Hongze Cheng 已提交
550
  switch (pMsg->msgType) {
D
dapan1121 已提交
551
    case TDMT_SCH_FETCH:
D
dapan1121 已提交
552
    case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
553
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
554
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
555
      return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
L
Liu Jicong 已提交
556 557
    // case TDMT_SCH_CANCEL_TASK:
    //   return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
558
    case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
559
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
560
    case TDMT_SCH_QUERY_HEARTBEAT:
D
dapan1121 已提交
561
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
562
    case TDMT_VND_TABLE_META:
D
dapan1121 已提交
563
      return vnodeGetTableMeta(pVnode, pMsg, true);
D
dapan1121 已提交
564
    case TDMT_VND_TABLE_CFG:
D
dapan1121 已提交
565 566 567
      return vnodeGetTableCfg(pVnode, pMsg, true);
    case TDMT_VND_BATCH_META:
      return vnodeGetBatchMeta(pVnode, pMsg);
568 569
//    case TDMT_VND_TMQ_CONSUME:
//      return tqProcessPollReq(pVnode->pTq, pMsg);
570 571
    case TDMT_VND_TMQ_VG_WALINFO:
      return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
572 573 574
    case TDMT_STREAM_TASK_RUN:
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
575
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
576 577
    case TDMT_STREAM_TASK_CHECK:
      return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
578
    case TDMT_STREAM_TASK_DISPATCH_RSP:
L
Liu Jicong 已提交
579
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
580 581
    case TDMT_STREAM_RETRIEVE:
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
582 583
    case TDMT_STREAM_RETRIEVE_RSP:
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
584
    case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE:
L
Liu Jicong 已提交
585 586 587 588 589
      return tqProcessTaskRecover1Req(pVnode->pTq, pMsg);
    case TDMT_STREAM_RECOVER_FINISH:
      return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_RECOVER_FINISH_RSP:
      return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
590 591
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
592
      return TSDB_CODE_APP_ERROR;
H
Hongze Cheng 已提交
593 594 595 596
  }
}

void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
C
Cary Xu 已提交
597
  // blockDebugShowDataBlocks(data, __func__);
598
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
599 600
}

D
dapan1121 已提交
601
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
602 603 604
  if (NULL == pMetaRsp) {
    return;
  }
L
Liu Jicong 已提交
605

D
dapan1121 已提交
606 607 608 609 610 611
  strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
  pMetaRsp->dbId = pVnode->config.dbId;
  pMetaRsp->vgId = TD_VID(pVnode);
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}

H
Hongze Cheng 已提交
612
extern int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now);
613
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
614
  int32_t     code = 0;
S
Shengliang Guan 已提交
615 616
  SVTrimDbReq trimReq = {0};

H
Hongze Cheng 已提交
617 618 619 620
  // decode
  if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
S
Shengliang Guan 已提交
621 622
  }

C
Cary Xu 已提交
623 624
  vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);

H
Hongze Cheng 已提交
625
  // process
H
Hongze Cheng 已提交
626
  vnodeAsyncRentention(pVnode, trimReq.timestamp);
H
Hongze Cheng 已提交
627 628
  tsem_wait(&pVnode->canCommit);
  tsem_post(&pVnode->canCommit);
C
Cary Xu 已提交
629

H
Hongze Cheng 已提交
630 631
_exit:
  return code;
S
Shengliang Guan 已提交
632 633
}

634
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
635 636 637
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
  if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
638 639 640 641 642 643
  SVDropTtlTableReq ttlReq = {0};
  if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto end;
  }

644
  vDebug("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp);
S
Shengliang Guan 已提交
645
  int32_t ret = metaTtlDropTable(pVnode->pMeta, ttlReq.timestamp, tbUids);
L
Liu Jicong 已提交
646
  if (ret != 0) {
647 648
    goto end;
  }
L
Liu Jicong 已提交
649
  if (taosArrayGetSize(tbUids) > 0) {
wmmhello's avatar
wmmhello 已提交
650 651
    tqUpdateTbUidList(pVnode->pTq, tbUids, false);
  }
652

H
Hongze Cheng 已提交
653
  vnodeAsyncRentention(pVnode, ttlReq.timestamp);
H
Hongze Cheng 已提交
654

655 656 657 658 659
end:
  taosArrayDestroy(tbUids);
  return ret;
}

660
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
661
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
662
  SDecoder       coder;
H
Hongze Cheng 已提交
663

H
Hongze Cheng 已提交
664 665 666 667 668 669
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
670
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
671 672

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
673 674
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
675 676
  }

677
  if (metaCreateSTable(pVnode->pMeta, ver, &req) < 0) {
H
Hongze Cheng 已提交
678 679
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
680 681
  }

682
  if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) {
683 684 685
    pRsp->code = terrno;
    goto _err;
  }
C
Cary Xu 已提交
686

H
Hongze Cheng 已提交
687
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
688
  return 0;
H
Hongze Cheng 已提交
689 690

_err:
H
Hongze Cheng 已提交
691
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
692
  return -1;
H
Hongze Cheng 已提交
693 694
}

695
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
696
  SDecoder           decoder = {0};
697
  SEncoder           encoder = {0};
698
  int32_t            rcode = 0;
H
Hongze Cheng 已提交
699 700
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
701 702 703
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
704
  STbUidStore       *pStore = NULL;
705
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
706 707

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
708 709 710
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
711

H
Hongze Cheng 已提交
712
  // decode
H
Hongze Cheng 已提交
713 714
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
715 716 717 718
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
719

H
Hongze Cheng 已提交
720
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
721 722
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
723 724 725 726 727
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
728
  // loop to create table
729
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
730
    pCreateReq = req.pReqs + iReq;
D
dapan1121 已提交
731
    memset(&cRsp, 0, sizeof(cRsp));
H
Hongze Cheng 已提交
732

C
Cary Xu 已提交
733 734 735 736
    if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
      rcode = -1;
      goto _exit;
    }
L
Liu Jicong 已提交
737

wafwerar's avatar
wafwerar 已提交
738 739 740 741 742
    if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
      rcode = -1;
      goto _exit;
    }

H
Hongze Cheng 已提交
743 744 745 746 747 748 749 750 751
    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
752
    if (metaCreateTable(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) {
H
Hongze Cheng 已提交
753 754 755 756 757
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
758
    } else {
H
Hongze Cheng 已提交
759
      cRsp.code = TSDB_CODE_SUCCESS;
760
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
761
      taosArrayPush(tbUids, &pCreateReq->uid);
L
Liu Jicong 已提交
762
      vnodeUpdateMetaRsp(pVnode, cRsp.pMeta);
H
Hongze Cheng 已提交
763
    }
H
Hongze Cheng 已提交
764 765

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
766 767
  }

H
Haojun Liao 已提交
768
  vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
769
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
770
  if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
C
Cary Xu 已提交
771 772
    goto _exit;
  }
773
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
774

H
Hongze Cheng 已提交
775
  // prepare rsp
776
  int32_t ret = 0;
wafwerar's avatar
wafwerar 已提交
777
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
778 779 780 781 782 783
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
784 785
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
H
Hongze Cheng 已提交
786

H
Hongze Cheng 已提交
787
_exit:
wmmhello's avatar
wmmhello 已提交
788 789
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
790
    taosMemoryFree(pCreateReq->comment);
wmmhello's avatar
wmmhello 已提交
791 792
    taosArrayDestroy(pCreateReq->ctb.tagName);
  }
793
  taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
794
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
795 796
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
797
  return rcode;
H
Hongze Cheng 已提交
798 799
}

800
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
801 802 803 804 805 806 807 808 809 810 811 812 813 814 815
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
816
  }
H
Hongze Cheng 已提交
817

818
  if (metaAlterSTable(pVnode->pMeta, ver, &req) < 0) {
H
Hongze Cheng 已提交
819 820 821 822 823 824 825
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
826 827 828
  return 0;
}

829
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
830
  SVDropStbReq req = {0};
831
  int32_t      rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
832
  SDecoder     decoder = {0};
833
  SArray      *tbUidList = NULL;
H
Hongze Cheng 已提交
834 835 836 837 838 839

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
840 841
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
842 843 844 845 846
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
847 848
  tbUidList = taosArrayInit(8, sizeof(int64_t));
  if (tbUidList == NULL) goto _exit;
849
  if (metaDropSTable(pVnode->pMeta, ver, &req, tbUidList) < 0) {
850 851 852 853 854
    rcode = terrno;
    goto _exit;
  }

  if (tqUpdateTbUidList(pVnode->pTq, tbUidList, false) < 0) {
855 856 857
    rcode = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
858

859 860 861 862 863
  if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
864 865
  // return rsp
_exit:
866
  if (tbUidList) taosArrayDestroy(tbUidList);
H
Hongze Cheng 已提交
867
  pRsp->code = rcode;
H
Hongze Cheng 已提交
868
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
869 870 871
  return 0;
}

872
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
D
dapan1121 已提交
873 874 875
  SVAlterTbReq  vAlterTbReq = {0};
  SVAlterTbRsp  vAlterTbRsp = {0};
  SDecoder      dc = {0};
876 877
  int32_t       rcode = 0;
  int32_t       ret;
D
dapan1121 已提交
878 879
  SEncoder      ec = {0};
  STableMetaRsp vMetaRsp = {0};
H
Hongze Cheng 已提交
880 881 882 883 884 885 886 887 888 889

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
890
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
891
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
892 893
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
894 895 896
  }

  // process
897
  if (metaAlterTable(pVnode->pMeta, ver, &vAlterTbReq, &vMetaRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
898
    vAlterTbRsp.code = terrno;
H
Hongze Cheng 已提交
899
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
900 901
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
902 903
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
904

D
dapan1121 已提交
905 906 907 908 909
  if (NULL != vMetaRsp.pSchemas) {
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
    vAlterTbRsp.pMeta = &vMetaRsp;
  }

H
Hongze Cheng 已提交
910 911 912 913 914 915
_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
M
Minglei Jin 已提交
916 917 918
  if (vMetaRsp.pSchemas) {
    taosMemoryFree(vMetaRsp.pSchemas);
  }
H
Hongze Cheng 已提交
919 920 921
  return 0;
}

922
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
923 924
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
925
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
926
  SEncoder         encoder = {0};
927
  int32_t          ret;
928
  SArray          *tbUids = NULL;
929
  STbUidStore     *pStore = NULL;
H
Hongze Cheng 已提交
930

H
Hongze Cheng 已提交
931
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
932 933 934
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
935 936

  // decode req
H
Hongze Cheng 已提交
937 938
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
939 940 941 942 943
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
944 945

  // process req
946
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
947
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
948 949
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

950
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
951 952
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
953
    tb_uid_t     tbUid = 0;
H
Hongze Cheng 已提交
954

H
Hongze Cheng 已提交
955
    /* code */
956
    ret = metaDropTable(pVnode->pMeta, ver, pDropTbReq, tbUids, &tbUid);
H
Hongze Cheng 已提交
957
    if (ret < 0) {
958
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
H
Hongze Cheng 已提交
959 960 961 962
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
963
    } else {
H
Hongze Cheng 已提交
964
      dropTbRsp.code = TSDB_CODE_SUCCESS;
965
      if (tbUid > 0) tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid);
H
Hongze Cheng 已提交
966 967 968 969 970
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

971
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);
972
  tdUpdateTbUidList(pVnode->pSma, pStore, false);
973

H
Hongze Cheng 已提交
974
_exit:
975
  taosArrayDestroy(tbUids);
976
  tdUidStoreFree(pStore);
H
Hongze Cheng 已提交
977
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
978 979 980 981 982
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
983
  taosArrayDestroy(rsp.pArray);
H
Hongze Cheng 已提交
984 985 986
  return 0;
}

987 988
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
                                              const char *tags) {
D
dapan 已提交
989 990 991 992
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;
C
Cary Xu 已提交
993
  int32_t        rv = -1;
D
dapan 已提交
994 995 996

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
997 998 999 1000 1001

  pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row), 1);  // TODO: use the real schema
  if (pSchema) {
    suid = msgIter->suid;
    rv = TD_ROW_SVER(blkIter.row);
D
dapan 已提交
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
typedef struct SSubmitReqConvertCxt {
  SSubmitMsgIter msgIter;
  SSubmitBlk    *pBlock;
  SSubmitBlkIter blkIter;
  STSRow        *pRow;
  STSRowIter     rowIter;
  SSubmitTbData *pTbData;
  STSchema      *pTbSchema;
  SArray        *pColValues;
} SSubmitReqConvertCxt;

static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) {
  taosMemoryFreeClear(pCxt->pTbSchema);
1031 1032
  pCxt->pTbSchema = metaGetTbTSchema(pMeta, pCxt->msgIter.suid > 0 ? pCxt->msgIter.suid : pCxt->msgIter.uid,
                                     pCxt->msgIter.sversion, 1);
1033 1034 1035 1036 1037
  if (NULL == pCxt->pTbSchema) {
    return TSDB_CODE_INVALID_MSG;
  }
  tdSTSRowIterInit(&pCxt->rowIter, pCxt->pTbSchema);

1038
  tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069
  if (NULL == pCxt->pTbData) {
    pCxt->pTbData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
    if (NULL == pCxt->pTbData) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  pCxt->pTbData->flags = 0;
  pCxt->pTbData->suid = pCxt->msgIter.suid;
  pCxt->pTbData->uid = pCxt->msgIter.uid;
  pCxt->pTbData->sver = pCxt->msgIter.sversion;
  pCxt->pTbData->pCreateTbReq = NULL;
  pCxt->pTbData->aRowP = taosArrayInit(128, POINTER_BYTES);
  if (NULL == pCxt->pTbData->aRowP) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  taosArrayDestroy(pCxt->pColValues);
  pCxt->pColValues = taosArrayInit(pCxt->pTbSchema->numOfCols, sizeof(SColVal));
  if (NULL == pCxt->pColValues) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for (int32_t i = 0; i < pCxt->pTbSchema->numOfCols; ++i) {
    SColVal val = COL_VAL_NONE(pCxt->pTbSchema->columns[i].colId, pCxt->pTbSchema->columns[i].type);
    taosArrayPush(pCxt->pColValues, &val);
  }

  return TSDB_CODE_SUCCESS;
}

static void vnodeDestroySubmitReqConvertCxt(SSubmitReqConvertCxt *pCxt) {
  taosMemoryFreeClear(pCxt->pTbSchema);
1070
  tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087
  taosMemoryFreeClear(pCxt->pTbData);
  taosArrayDestroy(pCxt->pColValues);
}

static int32_t vnodeCellValConvertToColVal(STColumn *pCol, SCellVal *pCellVal, SColVal *pColVal) {
  if (tdValTypeIsNone(pCellVal->valType)) {
    pColVal->flag = CV_FLAG_NONE;
    return TSDB_CODE_SUCCESS;
  }

  if (tdValTypeIsNull(pCellVal->valType)) {
    pColVal->flag = CV_FLAG_NULL;
    return TSDB_CODE_SUCCESS;
  }

  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pColVal->value.nData = varDataLen(pCellVal->val);
1088
    pColVal->value.pData = (uint8_t *)varDataVal(pCellVal->val);
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126
  } else if (TSDB_DATA_TYPE_FLOAT == pCol->type) {
    float f = GET_FLOAT_VAL(pCellVal->val);
    memcpy(&pColVal->value.val, &f, sizeof(f));
  } else if (TSDB_DATA_TYPE_DOUBLE == pCol->type) {
    pColVal->value.val = *(int64_t *)pCellVal->val;
  } else {
    GET_TYPED_DATA(pColVal->value.val, int64_t, pCol->type, pCellVal->val);
  }

  pColVal->flag = CV_FLAG_VALUE;
  return TSDB_CODE_SUCCESS;
}

static int32_t vnodeTSRowConvertToColValArray(SSubmitReqConvertCxt *pCxt) {
  int32_t code = TSDB_CODE_SUCCESS;
  tdSTSRowIterReset(&pCxt->rowIter, pCxt->pRow);
  for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < pCxt->pTbSchema->numOfCols; ++i) {
    STColumn *pCol = pCxt->pTbSchema->columns + i;
    SCellVal  cellVal = {0};
    if (!tdSTSRowIterFetch(&pCxt->rowIter, pCol->colId, pCol->type, &cellVal)) {
      break;
    }
    code = vnodeCellValConvertToColVal(pCol, &cellVal, (SColVal *)taosArrayGet(pCxt->pColValues, i));
  }
  return code;
}

static int32_t vnodeDecodeCreateTbReq(SSubmitReqConvertCxt *pCxt) {
  if (pCxt->msgIter.schemaLen <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  pCxt->pTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
  if (NULL == pCxt->pTbData->pCreateTbReq) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SDecoder decoder = {0};
1127
  tDecoderInit(&decoder, (uint8_t *)pCxt->pBlock->data, pCxt->msgIter.schemaLen);
1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179
  int32_t code = tDecodeSVCreateTbReq(&decoder, pCxt->pTbData->pCreateTbReq);
  tDecoderClear(&decoder);

  return code;
}

static int32_t vnodeSubmitReqConvertToSubmitReq2(SVnode *pVnode, SSubmitReq *pReq, SSubmitReq2 *pReq2) {
  pReq2->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
  if (NULL == pReq2->aSubmitTbData) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSubmitReqConvertCxt cxt = {0};

  int32_t code = tInitSubmitMsgIter(pReq, &cxt.msgIter);
  while (TSDB_CODE_SUCCESS == code) {
    code = tGetSubmitMsgNext(&cxt.msgIter, &cxt.pBlock);
    if (TSDB_CODE_SUCCESS == code) {
      if (NULL == cxt.pBlock) {
        break;
      }
      code = vnodeResetTableCxt(pVnode->pMeta, &cxt);
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = tInitSubmitBlkIter(&cxt.msgIter, cxt.pBlock, &cxt.blkIter);
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = vnodeDecodeCreateTbReq(&cxt);
    }
    while (TSDB_CODE_SUCCESS == code && (cxt.pRow = tGetSubmitBlkNext(&cxt.blkIter)) != NULL) {
      code = vnodeTSRowConvertToColValArray(&cxt);
      if (TSDB_CODE_SUCCESS == code) {
        SRow **pNewRow = taosArrayReserve(cxt.pTbData->aRowP, 1);
        code = tRowBuild(cxt.pColValues, cxt.pTbSchema, pNewRow);
      }
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = (NULL == taosArrayPush(pReq2->aSubmitTbData, cxt.pTbData) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);
    }
    if (TSDB_CODE_SUCCESS == code) {
      taosMemoryFreeClear(cxt.pTbData);
    }
  }

  vnodeDestroySubmitReqConvertCxt(&cxt);
  return code;
}

static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
  int32_t  code = TSDB_CODE_SUCCESS;
  char    *pMsg = NULL;
  uint32_t msglen = 0;
1180
  tEncodeSize(tEncodeSubmitReq, pSubmitReq, msglen, code);
1181 1182 1183 1184 1185 1186 1187 1188
  if (TSDB_CODE_SUCCESS == code) {
    pMsg = taosMemoryMalloc(msglen);
    if (NULL == pMsg) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    SEncoder encoder;
1189
    tEncoderInit(&encoder, (uint8_t *)pMsg, msglen);
1190
    code = tEncodeSubmitReq(&encoder, pSubmitReq);
1191 1192 1193 1194 1195 1196 1197 1198
    tEncoderClear(&encoder);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *ppMsg = pMsg;
  }
  return code;
}

1199
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
1200
  int32_t code = 0;
D
dapan1121 已提交
1201
  terrno = 0;
C
Cary Xu 已提交
1202

H
Hongze Cheng 已提交
1203
  SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
H
Hongze Cheng 已提交
1204
  SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0};
H
Hongze Cheng 已提交
1205
  SArray      *newTbUids = NULL;
H
Hongze Cheng 已提交
1206 1207 1208 1209
  int32_t      ret;
  SEncoder     ec = {0};

  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
1210

1211
  void           *pAllocMsg = NULL;
1212
  SSubmitReq2Msg *pMsg = (SSubmitReq2Msg *)pReq;
1213
  if (0 == pMsg->version) {
1214 1215 1216 1217
    code = vnodeSubmitReqConvertToSubmitReq2(pVnode, (SSubmitReq *)pMsg, pSubmitReq);
    if (TSDB_CODE_SUCCESS == code) {
      code = vnodeRebuildSubmitReqMsg(pSubmitReq, &pReq);
    }
1218 1219 1220
    if (TSDB_CODE_SUCCESS == code) {
      pAllocMsg = pReq;
    }
1221 1222 1223 1224 1225 1226 1227 1228 1229
    if (TSDB_CODE_SUCCESS != code) {
      goto _exit;
    }
  } else {
    // decode
    pReq = POINTER_SHIFT(pReq, sizeof(SSubmitReq2Msg));
    len -= sizeof(SSubmitReq2Msg);
    SDecoder dc = {0};
    tDecoderInit(&dc, pReq, len);
1230
    if (tDecodeSubmitReq(&dc, pSubmitReq) < 0) {
1231 1232 1233 1234
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }
    tDecoderClear(&dc);
D
dapan 已提交
1235
  }
C
Cary Xu 已提交
1236

H
Hongze Cheng 已提交
1237 1238 1239 1240 1241 1242 1243
  // scan
  TSKEY now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
  TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
  TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

H
Hongze Cheng 已提交
1244 1245 1246 1247 1248
    if (pSubmitTbData->pCreateTbReq && pSubmitTbData->pCreateTbReq->uid == 0) {
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }

H
Hongze Cheng 已提交
1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260
    if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
      if (TARRAY_SIZE(pSubmitTbData->aCol) <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      SColData *pColData = (SColData *)taosArrayGet(pSubmitTbData->aCol, 0);
      TSKEY    *aKey = (TSKEY *)(pColData->pData);

      for (int32_t iRow = 0; iRow < pColData->nVal; iRow++) {
        if (aKey[iRow] < minKey || aKey[iRow] > maxKey || (iRow > 0 && aKey[iRow] <= aKey[iRow - 1])) {
          code = TSDB_CODE_INVALID_MSG;
1261
          vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
H
Hongze Cheng 已提交
1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272
          goto _exit;
        }
      }

    } else {
      int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
      SRow  **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);

      for (int32_t iRow = 0; iRow < nRow; ++iRow) {
        if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) {
          code = TSDB_CODE_INVALID_MSG;
1273
          vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
H
Hongze Cheng 已提交
1274 1275 1276 1277 1278 1279
          goto _exit;
        }
      }
    }
  }

H
Hongze Cheng 已提交
1280
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
H
Hongze Cheng 已提交
1281 1282 1283 1284 1285 1286 1287 1288 1289 1290
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

    if (pSubmitTbData->pCreateTbReq) {
      pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid;
    } else {
      SMetaInfo info = {0};

      code = metaGetInfo(pVnode->pMeta, pSubmitTbData->uid, &info, NULL);
      if (code) {
        code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
D
dapan1121 已提交
1291
        vWarn("vgId:%d, table uid:%" PRId64 " not exists", TD_VID(pVnode), pSubmitTbData->uid);
H
Hongze Cheng 已提交
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308
        goto _exit;
      }

      if (info.suid != pSubmitTbData->suid) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      if (info.suid) {
        metaGetInfo(pVnode->pMeta, info.suid, &info, NULL);
      }

      if (pSubmitTbData->sver != info.skmVer) {
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
        goto _exit;
      }
    }
H
Hongze Cheng 已提交
1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324

    if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
      int32_t   nColData = TARRAY_SIZE(pSubmitTbData->aCol);
      SColData *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);

      if (nColData <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      if (aColData[0].cid != PRIMARYKEY_TIMESTAMP_COL_ID || aColData[0].type != TSDB_DATA_TYPE_TIMESTAMP ||
          aColData[0].nVal <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

1325 1326
      for (int32_t j = 1; j < nColData; j++) {
        if (aColData[j].nVal != aColData[0].nVal) {
H
Hongze Cheng 已提交
1327 1328 1329 1330 1331
          code = TSDB_CODE_INVALID_MSG;
          goto _exit;
        }
      }
    }
H
Hongze Cheng 已提交
1332 1333
  }

L
Liu Jicong 已提交
1334 1335
  vDebug("vgId:%d, submit block size %d", TD_VID(pVnode), (int32_t)taosArrayGetSize(pSubmitReq->aSubmitTbData));

H
Hongze Cheng 已提交
1336
  // loop to handle
H
Hongze Cheng 已提交
1337
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
H
Hongze Cheng 已提交
1338 1339 1340 1341
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

    // create table
    if (pSubmitTbData->pCreateTbReq) {
H
Hongze Cheng 已提交
1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352
      // check (TODO: move check to create table)
      code = grantCheck(TSDB_GRANT_TIMESERIES);
      if (code) goto _exit;

      code = grantCheck(TSDB_GRANT_TABLE);
      if (code) goto _exit;

      // alloc if need
      if (pSubmitRsp->aCreateTbRsp == NULL &&
          (pSubmitRsp->aCreateTbRsp = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(SVCreateTbRsp))) ==
              NULL) {
H
Hongze Cheng 已提交
1353
        code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1354 1355 1356 1357 1358 1359
        goto _exit;
      }

      SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);

      // create table
1360
      if (metaCreateTable(pVnode->pMeta, ver, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) {
1361
        // create table success
H
Hongze Cheng 已提交
1362 1363 1364

        if (newTbUids == NULL &&
            (newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) {
H
Hongze Cheng 已提交
1365
          code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1366 1367 1368 1369 1370 1371
          goto _exit;
        }

        taosArrayPush(newTbUids, &pSubmitTbData->uid);

        if (pCreateTbRsp->pMeta) {
D
dapan1121 已提交
1372
          vnodeUpdateMetaRsp(pVnode, pCreateTbRsp->pMeta);
H
Hongze Cheng 已提交
1373 1374 1375 1376 1377 1378
        }
      } else {  // create table failed
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
          code = terrno;
          goto _exit;
        }
X
Xiaoyu Wang 已提交
1379
        terrno = 0;
1380
        pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid;  // update uid if table exist for using below
H
Hongze Cheng 已提交
1381
      }
H
Hongze Cheng 已提交
1382 1383 1384
    }

    // insert data
H
Hongze Cheng 已提交
1385
    int32_t affectedRows;
1386
    code = tsdbInsertTableData(pVnode->pTsdb, ver, pSubmitTbData, &affectedRows);
H
Hongze Cheng 已提交
1387 1388 1389
    if (code) goto _exit;

    pSubmitRsp->affectedRows += affectedRows;
H
Hongze Cheng 已提交
1390
  }
H
Hongze Cheng 已提交
1391

1392
  // update the affected table uid list
H
Hongze Cheng 已提交
1393 1394 1395 1396
  if (taosArrayGetSize(newTbUids) > 0) {
    vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
           (int32_t)taosArrayGetSize(newTbUids));
    tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
H
Hongze Cheng 已提交
1397
  }
H
Hongze Cheng 已提交
1398 1399 1400 1401 1402 1403 1404 1405 1406 1407

_exit:
  // message
  pRsp->code = code;
  tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSSubmitRsp2(&ec, pSubmitRsp);
  tEncoderClear(&ec);

H
Hongze Cheng 已提交
1408 1409 1410 1411 1412 1413
  // update statistics
  atomic_add_fetch_64(&pVnode->statis.nInsert, pSubmitRsp->affectedRows);
  atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows);
  atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
  if (code == 0) {
    atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
1414
    tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
H
Hongze Cheng 已提交
1415 1416
  }

H
Hongze Cheng 已提交
1417 1418
  // clear
  taosArrayDestroy(newTbUids);
1419
  tDestroySubmitReq(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE);
H
Hongze Cheng 已提交
1420
  tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
H
Hongze Cheng 已提交
1421

D
dapan1121 已提交
1422 1423
  if (code) terrno = code;

1424
  taosMemoryFree(pAllocMsg);
1425

H
Hongze Cheng 已提交
1426
  return code;
L
Liu Jicong 已提交
1427
}
1428

1429
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1430
  SVCreateTSmaReq req = {0};
C
Cary Xu 已提交
1431
  SDecoder        coder = {0};
1432

C
Cary Xu 已提交
1433 1434 1435 1436 1437 1438
  if (pRsp) {
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
    pRsp->code = TSDB_CODE_SUCCESS;
    pRsp->pCont = NULL;
    pRsp->contLen = 0;
  }
1439 1440 1441 1442 1443

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
C
Cary Xu 已提交
1444 1445
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
    if (pRsp) pRsp->code = terrno;
1446
    goto _err;
1447
  }
C
Cary Xu 已提交
1448

1449
  if (tdProcessTSmaCreate(pVnode->pSma, ver, (const char *)&req) < 0) {
C
Cary Xu 已提交
1450
    if (pRsp) pRsp->code = terrno;
1451
    goto _err;
1452
  }
C
Cary Xu 已提交
1453

1454
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
1455
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
1456
         req.indexName, req.indexUid, ver, req.tableUid);
H
Hongze Cheng 已提交
1457
  return 0;
1458 1459 1460

_err:
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
1461
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
1462
         TD_VID(pVnode), req.indexName, req.indexUid, ver, req.tableUid, terrstr());
1463
  return -1;
L
Liu Jicong 已提交
1464
}
C
Cary Xu 已提交
1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476

/**
 * @brief specific for smaDstVnode
 *
 * @param pVnode
 * @param pCont
 * @param contLen
 * @return int32_t
 */
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
1477

1478
static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) {
B
Benguang Zhao 已提交
1479 1480 1481
  int32_t code = TSDB_CODE_SUCCESS;

  vInfo("vgId:%d, trim meta of tables per hash range [%" PRIu32 ", %" PRIu32 "]. apply-index:%" PRId64, TD_VID(pVnode),
1482
        pVnode->config.hashBegin, pVnode->config.hashEnd, ver);
B
Benguang Zhao 已提交
1483 1484

  // TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
M
Minglei Jin 已提交
1485
  code = metaTrimTables(pVnode->pMeta);
B
Benguang Zhao 已提交
1486 1487 1488 1489

  return code;
}

1490
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
B
Benguang Zhao 已提交
1491 1492 1493 1494 1495 1496
  vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confim msg is processed", TD_VID(pVnode));
  int32_t code = TSDB_CODE_SUCCESS;
  if (!pVnode->config.hashChange) {
    goto _exit;
  }

1497
  code = vnodeConsolidateAlterHashRange(pVnode, ver);
1498
  if (code < 0) {
1499
    vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(), ver);
1500 1501
    goto _exit;
  }
B
Benguang Zhao 已提交
1502 1503 1504
  pVnode->config.hashChange = false;

_exit:
1505
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
B
Benguang Zhao 已提交
1506
  pRsp->code = code;
1507 1508 1509
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

1510
  return code;
1511
}
S
Shengliang Guan 已提交
1512

1513
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
1514 1515
  bool walChanged = false;
  bool tsdbChanged = false;
1516

S
Shengliang Guan 已提交
1517 1518
  SAlterVnodeConfigReq req = {0};
  if (tDeserializeSAlterVnodeConfigReq(pReq, len, &req) != 0) {
1519 1520 1521 1522
    terrno = TSDB_CODE_INVALID_MSG;
    return TSDB_CODE_INVALID_MSG;
  }

1523
  vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
1524 1525
        " cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d walRetentionPeriod:%d "
        "walRetentionSize:%d",
1526 1527
        TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
        req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
1528
        req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize);
1529 1530 1531

  if (pVnode->config.cacheLastSize != req.cacheLastSize) {
    pVnode->config.cacheLastSize = req.cacheLastSize;
1532 1533
    tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
  }
1534

1535
  if (pVnode->config.szBuf != req.buffer * 1024LL * 1024LL) {
S
Shengliang Guan 已提交
1536
    vInfo("vgId:%d, vnode buffer is changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pVnode->config.szBuf,
1537
          (uint64_t)(req.buffer * 1024LL * 1024LL));
1538
    pVnode->config.szBuf = req.buffer * 1024LL * 1024LL;
H
Hongze Cheng 已提交
1539 1540
  }

1541 1542
  if (pVnode->config.szCache != req.pages) {
    if (metaAlterCache(pVnode->pMeta, req.pages) < 0) {
S
Shengliang Guan 已提交
1543
      vError("vgId:%d, failed to change vnode pages from %d to %d failed since %s", TD_VID(pVnode),
1544
             pVnode->config.szCache, req.pages, tstrerror(errno));
H
Hongze Cheng 已提交
1545 1546
      return errno;
    } else {
S
Shengliang Guan 已提交
1547
      vInfo("vgId:%d, vnode pages is changed from %d to %d", TD_VID(pVnode), pVnode->config.szCache, req.pages);
1548
      pVnode->config.szCache = req.pages;
H
Hongze Cheng 已提交
1549
    }
H
Hongze Cheng 已提交
1550 1551
  }

1552 1553
  if (pVnode->config.cacheLast != req.cacheLast) {
    pVnode->config.cacheLast = req.cacheLast;
1554 1555
  }

1556 1557
  if (pVnode->config.walCfg.fsyncPeriod != req.walFsyncPeriod) {
    pVnode->config.walCfg.fsyncPeriod = req.walFsyncPeriod;
1558 1559 1560
    walChanged = true;
  }

1561 1562
  if (pVnode->config.walCfg.level != req.walLevel) {
    pVnode->config.walCfg.level = req.walLevel;
1563 1564 1565 1566 1567 1568 1569
    walChanged = true;
  }

  if (pVnode->config.walCfg.retentionPeriod != req.walRetentionPeriod) {
    pVnode->config.walCfg.retentionPeriod = req.walRetentionPeriod;
    walChanged = true;
  }
1570

1571 1572
  if (pVnode->config.walCfg.retentionSize != req.walRetentionSize) {
    pVnode->config.walCfg.retentionSize = req.walRetentionSize;
1573 1574 1575
    walChanged = true;
  }

1576 1577
  if (pVnode->config.tsdbCfg.keep0 != req.daysToKeep0) {
    pVnode->config.tsdbCfg.keep0 = req.daysToKeep0;
1578
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1579
      tsdbChanged = true;
1580 1581 1582
    }
  }

1583 1584
  if (pVnode->config.tsdbCfg.keep1 != req.daysToKeep1) {
    pVnode->config.tsdbCfg.keep1 = req.daysToKeep1;
1585
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1586
      tsdbChanged = true;
1587 1588 1589
    }
  }

1590 1591
  if (pVnode->config.tsdbCfg.keep2 != req.daysToKeep2) {
    pVnode->config.tsdbCfg.keep2 = req.daysToKeep2;
1592
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1593
      tsdbChanged = true;
1594 1595 1596
    }
  }

1597 1598 1599 1600 1601 1602 1603 1604
  if (req.sttTrigger != -1 && req.sttTrigger != pVnode->config.sttTrigger) {
    pVnode->config.sttTrigger = req.sttTrigger;
  }

  if (req.minRows != -1 && req.minRows != pVnode->config.tsdbCfg.minRows) {
    pVnode->config.tsdbCfg.minRows = req.minRows;
  }

1605
  if (walChanged) {
M
Minglei Jin 已提交
1606 1607 1608 1609 1610
    walAlter(pVnode->pWal, &pVnode->config.walCfg);
  }

  if (tsdbChanged) {
    tsdbSetKeepCfg(pVnode->pTsdb, &pVnode->config.tsdbCfg);
1611 1612
  }

1613 1614 1615
  return 0;
}

1616
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1617 1618 1619 1620 1621
  SBatchDeleteReq deleteReq;
  SDecoder        decoder;
  tDecoderInit(&decoder, pReq, len);
  tDecodeSBatchDeleteReq(&decoder, &deleteReq);

1622
  SMetaReader mr = {0};
1623
  metaReaderInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
1624

1625 1626 1627
  int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
  for (int32_t i = 0; i < sz; i++) {
    SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i);
1628 1629
    char             *name = pOneReq->tbname;
    if (metaGetTableEntryByName(&mr, name) < 0) {
S
Shengliang Guan 已提交
1630
      vDebug("vgId:%d, stream delete msg, skip since no table: %s", pVnode->config.vgId, name);
1631 1632 1633 1634 1635
      continue;
    }

    int64_t uid = mr.me.uid;

1636
    int32_t code = tsdbDeleteTableData(pVnode->pTsdb, ver, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
1637 1638 1639
    if (code < 0) {
      terrno = code;
      vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
L
Liu Jicong 已提交
1640
             TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
1641
    }
1642 1643

    tDecoderClear(&mr.coder);
1644
  }
1645
  metaReaderClear(&mr);
1646
  taosArrayDestroy(deleteReq.deleteReqs);
1647 1648 1649
  return 0;
}

1650
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
1651 1652 1653 1654
  int32_t     code = 0;
  SDecoder   *pCoder = &(SDecoder){0};
  SDeleteRes *pRes = &(SDeleteRes){0};

wmmhello's avatar
wmmhello 已提交
1655 1656 1657 1658 1659
  pRsp->msgType = TDMT_VND_DELETE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

H
Hongze Cheng 已提交
1660 1661 1662 1663 1664 1665 1666 1667
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
1668
  ASSERT(taosArrayGetSize(pRes->uidList) == 0 || (pRes->skey != 0 && pRes->ekey != 0));
H
Hongze Cheng 已提交
1669 1670

  for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
1671
    code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
H
Hongze Cheng 已提交
1672 1673 1674 1675 1676 1677
                               pRes->skey, pRes->ekey);
    if (code) goto _err;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pRes->uidList);
wmmhello's avatar
wmmhello 已提交
1678 1679

  SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows};
L
Liu Jicong 已提交
1680
  int32_t     ret = 0;
wmmhello's avatar
wmmhello 已提交
1681 1682
  tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
L
Liu Jicong 已提交
1683
  SEncoder ec = {0};
wmmhello's avatar
wmmhello 已提交
1684 1685 1686
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVDeleteRsp(&ec, &rsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
1687 1688 1689 1690
  return code;

_err:
  return code;
M
Minglei Jin 已提交
1691
}
1692
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
dengyihao's avatar
dengyihao 已提交
1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_CREATE_INDEX_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);
  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
  }
1708
  if (metaAddIndexToSTable(pVnode->pMeta, ver, &req) < 0) {
dengyihao's avatar
dengyihao 已提交
1709 1710 1711 1712 1713 1714 1715 1716
    pRsp->code = terrno;
    goto _err;
  }
  tDecoderClear(&dc);
  return 0;
_err:
  tDecoderClear(&dc);
  return -1;
dengyihao's avatar
dengyihao 已提交
1717
}
1718
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
dengyihao's avatar
dengyihao 已提交
1719
  SDropIndexReq req = {0};
dengyihao's avatar
dengyihao 已提交
1720
  pRsp->msgType = TDMT_VND_DROP_INDEX_RSP;
dengyihao's avatar
dengyihao 已提交
1721 1722 1723 1724 1725 1726 1727 1728
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  if (tDeserializeSDropIdxReq(pReq, len, &req)) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1729

1730
  if (metaDropIndexFromSTable(pVnode->pMeta, ver, &req) < 0) {
dengyihao's avatar
dengyihao 已提交
1731 1732 1733
    pRsp->code = terrno;
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1734 1735
  return TSDB_CODE_SUCCESS;
}
1736

1737
extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
1738

1739 1740
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
  return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp);
H
Hongze Cheng 已提交
1741
}
1742

H
Hongze Cheng 已提交
1743
#ifndef TD_ENTERPRISE
1744
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1745
  return 0;
dengyihao's avatar
dengyihao 已提交
1746
}
H
Hongze Cheng 已提交
1747
#endif