vnodeSvr.c 57.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16 17
#include "tencode.h"
#include "tmsg.h"
H
Hongze Cheng 已提交
18
#include "vnd.h"
H
Hongze Cheng 已提交
19 20
#include "vnode.h"
#include "vnodeInt.h"
H
Hongze Cheng 已提交
21

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
39

40
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t btime, int64_t *pUid) {
H
Hongze Cheng 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
  int32_t code = 0;
  int32_t lino = 0;

  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // flags
  if (tDecodeI32v(pCoder, NULL) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // name
  char *name = NULL;
  if (tDecodeCStr(pCoder, &name) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // uid
  int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
  if (uid == 0) {
    uid = tGenIdPI64();
  }
  *(int64_t *)(pCoder->data + pCoder->pos) = uid;

69 70
  // btime
  *(int64_t *)(pCoder->data + pCoder->pos + 8) = btime;
H
Hongze Cheng 已提交
71 72 73 74 75 76 77 78

  tEndDecode(pCoder);

_exit:
  if (code) {
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  } else {
    vTrace("vgId:%d %s done, table:%s uid generated:%" PRId64, TD_VID(pVnode), __func__, name, uid);
H
Hongze Cheng 已提交
79
    if (pUid) *pUid = uid;
H
Hongze Cheng 已提交
80 81 82
  }
  return code;
}
H
Hongze Cheng 已提交
83 84 85 86
static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;
  int32_t lino = 0;

87
  int64_t  btime = taosGetTimestampMs();
H
Hongze Cheng 已提交
88
  SDecoder dc = {0};
H
Hongze Cheng 已提交
89
  int32_t  nReqs;
H
Hongze Cheng 已提交
90

H
Hongze Cheng 已提交
91 92 93 94 95 96 97 98 99 100 101
  tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
  if (tStartDecode(&dc) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    return code;
  }

  if (tDecodeI32v(&dc, &nReqs) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  for (int32_t iReq = 0; iReq < nReqs; iReq++) {
102
    code = vnodePreprocessCreateTableReq(pVnode, &dc, btime, NULL);
H
Hongze Cheng 已提交
103
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
104 105 106 107 108 109
  }

  tEndDecode(&dc);

_exit:
  tDecoderClear(&dc);
D
dapan1121 已提交
110 111 112 113
  if (code) {
    vError("vgId:%d, %s:%d failed to preprocess submit request since %s, msg type:%s", TD_VID(pVnode), __func__, lino,
           tstrerror(code), TMSG_INFO(pMsg->msgType));
  }
H
Hongze Cheng 已提交
114 115
  return code;
}
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142

static int32_t vnodePreProcessAlterTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = TSDB_CODE_INVALID_MSG;
  int32_t lino = 0;

  SDecoder dc = {0};
  tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));

  SVAlterTbReq vAlterTbReq = {0};
  int64_t      ctimeMs = taosGetTimestampMs();
  if (tDecodeSVAlterTbReqSetCtime(&dc, &vAlterTbReq, ctimeMs) < 0) {
    goto _exit;
  }

  code = 0;

_exit:
  tDecoderClear(&dc);
  if (code) {
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  } else {
    vTrace("vgId:%d %s done, table:%s ctimeMs generated:%" PRId64, TD_VID(pVnode), __func__, vAlterTbReq.tbName,
           ctimeMs);
  }
  return code;
}

H
Hongze Cheng 已提交
143
extern int64_t tsMaxKeyByPrecision[];
144
static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t btimeMs, int64_t ctimeMs) {
H
Hongze Cheng 已提交
145 146 147
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
148 149 150 151
  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
152

H
Hongze Cheng 已提交
153 154 155 156 157
  SSubmitTbData submitTbData;
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
158

H
Hongze Cheng 已提交
159
  int64_t uid;
H
Hongze Cheng 已提交
160
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
161
    code = vnodePreprocessCreateTableReq(pVnode, pCoder, btimeMs, &uid);
H
Hongze Cheng 已提交
162 163 164 165 166 167 168 169
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // submit data
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
170 171 172

  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
    *(int64_t *)(pCoder->data + pCoder->pos) = uid;
H
Hongze Cheng 已提交
173
    pCoder->pos += sizeof(int64_t);
H
Hongze Cheng 已提交
174
  } else {
H
Hongze Cheng 已提交
175 176 177 178
    if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
      code = TSDB_CODE_INVALID_MSG;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
179
  }
H
Hongze Cheng 已提交
180

H
Hongze Cheng 已提交
181
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
H
Hongze Cheng 已提交
182 183 184 185
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
186
  // scan and check
187
  TSKEY now = btimeMs;
H
Hongze Cheng 已提交
188 189 190 191 192 193 194 195 196 197
  if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) {
    now *= 1000;
  } else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) {
    now *= 1000000;
  }
  TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
  TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    uint64_t nColData;
    if (tDecodeU64v(pCoder, &nColData) < 0) {
H
Hongze Cheng 已提交
198
      code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
199
      goto _exit;
H
Hongze Cheng 已提交
200 201
    }

H
Hongze Cheng 已提交
202 203
    SColData colData = {0};
    pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
H
Hongze Cheng 已提交
204 205 206 207 208
    if (colData.flag != HAS_VALUE) {
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }

H
Hongze Cheng 已提交
209 210 211 212 213 214
    for (int32_t iRow = 0; iRow < colData.nVal; iRow++) {
      if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) {
        code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
        goto _exit;
      }
    }
215 216 217 218

    for (uint64_t i = 1; i < nColData; i++) {
      pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
    }
H
Hongze Cheng 已提交
219 220 221
  } else {
    uint64_t nRow;
    if (tDecodeU64v(pCoder, &nRow) < 0) {
H
Hongze Cheng 已提交
222
      code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
223
      goto _exit;
H
Hongze Cheng 已提交
224
    }
H
Hongze Cheng 已提交
225

H
Hongze Cheng 已提交
226 227 228
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
      pCoder->pos += pRow->len;
H
Hongze Cheng 已提交
229

H
Hongze Cheng 已提交
230 231 232
      if (pRow->ts < minKey || pRow->ts > maxKey) {
        code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
        goto _exit;
H
Hongze Cheng 已提交
233
      }
H
Hongze Cheng 已提交
234 235
    }
  }
H
Hongze Cheng 已提交
236

S
Shungang Li 已提交
237 238 239 240
  if (!tDecodeIsEnd(pCoder)) {
    *(int64_t *)(pCoder->data + pCoder->pos) = ctimeMs;
    pCoder->pos += sizeof(int64_t);
  }
241

H
Hongze Cheng 已提交
242
  tEndDecode(pCoder);
H
Hongze Cheng 已提交
243

H
Hongze Cheng 已提交
244
_exit:
H
Hongze Cheng 已提交
245
  return code;
H
Hongze Cheng 已提交
246 247 248 249
}
static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
250

H
Hongze Cheng 已提交
251
  SDecoder *pCoder = &(SDecoder){0};
H
Hongze Cheng 已提交
252

H
Hongze Cheng 已提交
253
  if (taosHton64(((SSubmitReq2Msg *)pMsg->pCont)->version) != 1) {
H
Hongze Cheng 已提交
254 255 256 257
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

X
Xiaoyu Wang 已提交
258
  tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SSubmitReq2Msg), pMsg->contLen - sizeof(SSubmitReq2Msg));
H
Hongze Cheng 已提交
259

H
Hongze Cheng 已提交
260 261 262 263
  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
264

H
Hongze Cheng 已提交
265 266 267 268 269
  uint64_t nSubmitTbData;
  if (tDecodeU64v(pCoder, &nSubmitTbData) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
270

271 272
  int64_t btimeMs = taosGetTimestampMs();
  int64_t ctimeMs = btimeMs;
H
Hongze Cheng 已提交
273
  for (int32_t i = 0; i < nSubmitTbData; i++) {
274
    code = vnodePreProcessSubmitTbData(pVnode, pCoder, btimeMs, ctimeMs);
H
Hongze Cheng 已提交
275
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
276
  }
H
Hongze Cheng 已提交
277

H
Hongze Cheng 已提交
278
  tEndDecode(pCoder);
H
Hongze Cheng 已提交
279

H
Hongze Cheng 已提交
280
_exit:
D
dapan1121 已提交
281
  tDecoderClear(pCoder);
282
  if (code) {
D
dapan1121 已提交
283 284
    vError("vgId:%d, %s:%d failed to preprocess submit request since %s, msg type:%s", TD_VID(pVnode), __func__, lino,
           tstrerror(code), TMSG_INFO(pMsg->msgType));
285
  }
H
Hongze Cheng 已提交
286 287
  return code;
}
H
Hongze Cheng 已提交
288

H
Hongze Cheng 已提交
289 290 291
static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;

292 293 294 295 296
  int32_t    size;
  int32_t    ret;
  uint8_t   *pCont;
  SEncoder  *pCoder = &(SEncoder){0};
  SDeleteRes res = {0};
H
Haojun Liao 已提交
297

298
  SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Haojun Liao 已提交
299
  initStorageAPI(&handle.api);
H
Hongze Cheng 已提交
300 301 302 303

  code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
  if (code) goto _exit;

304
  res.ctimeMs = taosGetTimestampMs();
H
Hongze Cheng 已提交
305 306 307
  // malloc and encode
  tEncodeSize(tEncodeDeleteRes, &res, size, ret);
  pCont = rpcMallocCont(size + sizeof(SMsgHead));
H
Hongze Cheng 已提交
308

H
Hongze Cheng 已提交
309 310
  ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
  ((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
H
Hongze Cheng 已提交
311

H
Hongze Cheng 已提交
312 313 314
  tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
  tEncodeDeleteRes(pCoder, &res);
  tEncoderClear(pCoder);
H
Hongze Cheng 已提交
315

H
Hongze Cheng 已提交
316 317 318
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = pCont;
  pMsg->contLen = size + sizeof(SMsgHead);
H
Hongze Cheng 已提交
319

H
Hongze Cheng 已提交
320 321 322 323 324
  taosArrayDestroy(res.uidList);

_exit:
  return code;
}
H
Hongze Cheng 已提交
325

326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
static int32_t vnodePreProcessBatchDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;
  int32_t lino = 0;

  int64_t         ctimeMs = taosGetTimestampMs();
  SBatchDeleteReq pReq = {0};
  SDecoder       *pCoder = &(SDecoder){0};

  tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));

  if (tDecodeSBatchDeleteReqSetCtime(pCoder, &pReq, ctimeMs) < 0) {
    code = TSDB_CODE_INVALID_MSG;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pReq.deleteReqs);

  if (code) {
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  } else {
    vTrace("vgId:%d %s done, ctimeMs generated:%" PRId64, TD_VID(pVnode), __func__, ctimeMs);
  }
  return code;
}

H
Hongze Cheng 已提交
351 352 353 354 355 356 357
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;

  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      code = vnodePreProcessCreateTableMsg(pVnode, pMsg);
    } break;
358 359 360
    case TDMT_VND_ALTER_TABLE: {
      code = vnodePreProcessAlterTableMsg(pVnode, pMsg);
    } break;
H
Hongze Cheng 已提交
361 362 363 364 365
    case TDMT_VND_SUBMIT: {
      code = vnodePreProcessSubmitMsg(pVnode, pMsg);
    } break;
    case TDMT_VND_DELETE: {
      code = vnodePreProcessDeleteMsg(pVnode, pMsg);
H
Hongze Cheng 已提交
366
    } break;
367 368 369
    case TDMT_VND_BATCH_DEL: {
      code = vnodePreProcessBatchDeleteMsg(pVnode, pMsg);
    } break;
H
Hongze Cheng 已提交
370 371 372
    default:
      break;
  }
H
Hongze Cheng 已提交
373

H
Hongze Cheng 已提交
374 375
_exit:
  if (code) {
D
dapan1121 已提交
376 377
    vError("vgId:%d, failed to preprocess write request since %s, msg type:%s", TD_VID(pVnode), tstrerror(code),
           TMSG_INFO(pMsg->msgType));
H
Hongze Cheng 已提交
378
  }
H
Hongze Cheng 已提交
379
  return code;
H
Hongze Cheng 已提交
380 381
}

382
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg *pRsp) {
383 384 385 386
  void   *ptr = NULL;
  void   *pReq;
  int32_t len;
  int32_t ret;
387

388 389
  if (ver <= pVnode->state.applied) {
    vError("vgId:%d, duplicate write request. ver: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), ver,
390
           pVnode->state.applied);
391
    terrno = TSDB_CODE_VND_DUP_REQUEST;
392 393 394
    return -1;
  }

395
  vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver);
H
Hongze Cheng 已提交
396

397
  ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
398
  ASSERT(pVnode->state.applied + 1 == ver);
399

400
  atomic_store_64(&pVnode->state.applied, ver);
401
  atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
H
Hongze Cheng 已提交
402

403 404
  if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;

L
Liu Jicong 已提交
405
  if (pMsg->msgType == TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE || pMsg->msgType == TDMT_STREAM_TASK_CHECK_RSP) {
406
    if (tqCheckLogInWal(pVnode->pTq, ver)) return 0;
L
Liu Jicong 已提交
407 408
  }

H
Hongze Cheng 已提交
409 410 411
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
B
Benguang Zhao 已提交
412
  bool needCommit = false;
H
Hongze Cheng 已提交
413 414

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
415
    /* META */
H
Hongze Cheng 已提交
416
    case TDMT_VND_CREATE_STB:
417
      if (vnodeProcessCreateStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
418
      break;
H
Hongze Cheng 已提交
419
    case TDMT_VND_ALTER_STB:
420
      if (vnodeProcessAlterStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
421
      break;
H
Hongze Cheng 已提交
422
    case TDMT_VND_DROP_STB:
423
      if (vnodeProcessDropStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
424
      break;
H
Hongze Cheng 已提交
425
    case TDMT_VND_CREATE_TABLE:
426
      if (vnodeProcessCreateTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
427 428
      break;
    case TDMT_VND_ALTER_TABLE:
429
      if (vnodeProcessAlterTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
430
      break;
H
Hongze Cheng 已提交
431
    case TDMT_VND_DROP_TABLE:
432
      if (vnodeProcessDropTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
433
      break;
434
    case TDMT_VND_DROP_TTL_TABLE:
435
      if (vnodeProcessDropTtlTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
436
      break;
S
Shengliang Guan 已提交
437
    case TDMT_VND_TRIM:
438
      if (vnodeProcessTrimReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
439 440
      break;
    case TDMT_VND_CREATE_SMA:
441
      if (vnodeProcessCreateTSmaReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
442
      break;
H
Hongze Cheng 已提交
443
    /* TSDB */
H
Hongze Cheng 已提交
444
    case TDMT_VND_SUBMIT:
445
      if (vnodeProcessSubmitReq(pVnode, ver, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
446
      break;
D
dapan1121 已提交
447
    case TDMT_VND_DELETE:
448
      if (vnodeProcessDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
D
dapan1121 已提交
449
      break;
450
    case TDMT_VND_BATCH_DEL:
451
      if (vnodeProcessBatchDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
452
      break;
H
Hongze Cheng 已提交
453
    /* TQ */
L
Liu Jicong 已提交
454
    case TDMT_VND_TMQ_SUBSCRIBE:
455
      if (tqProcessSubscribeReq(pVnode->pTq, ver, pReq, len) < 0) {
456
        goto _err;
L
Liu Jicong 已提交
457 458
      }
      break;
L
Liu Jicong 已提交
459
    case TDMT_VND_TMQ_DELETE_SUB:
460
      if (tqProcessDeleteSubReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
461 462 463
        goto _err;
      }
      break;
L
Liu Jicong 已提交
464
    case TDMT_VND_TMQ_COMMIT_OFFSET:
465
      if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
466
        goto _err;
L
Liu Jicong 已提交
467 468
      }
      break;
L
Liu Jicong 已提交
469
    case TDMT_VND_TMQ_ADD_CHECKINFO:
470
      if (tqProcessAddCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
471 472 473
        goto _err;
      }
      break;
L
Liu Jicong 已提交
474
    case TDMT_VND_TMQ_DEL_CHECKINFO:
475
      if (tqProcessDelCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
L
Liu Jicong 已提交
476 477 478
        goto _err;
      }
      break;
479
    case TDMT_STREAM_TASK_DEPLOY: {
480
      if (pVnode->restored && tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) {
481
        goto _err;
H
Hongze Cheng 已提交
482 483
      }
    } break;
L
Liu Jicong 已提交
484
    case TDMT_STREAM_TASK_DROP: {
485
      if (tqProcessTaskDropReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
L
Liu Jicong 已提交
486 487 488
        goto _err;
      }
    } break;
5
54liuyao 已提交
489
    case TDMT_STREAM_TASK_PAUSE: {
490
      if (pVnode->restored && tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
5
54liuyao 已提交
491 492 493 494
        goto _err;
      }
    } break;
    case TDMT_STREAM_TASK_RESUME: {
495
      if (pVnode->restored && tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
5
54liuyao 已提交
496 497 498
        goto _err;
      }
    } break;
L
Liu Jicong 已提交
499
    case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: {
500
      if (tqProcessTaskRecover2Req(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
L
Liu Jicong 已提交
501 502 503
        goto _err;
      }
    } break;
504
    case TDMT_STREAM_TASK_CHECK_RSP: {
505
      if (tqProcessStreamTaskCheckRsp(pVnode->pTq, ver, pReq, len) < 0) {
506 507 508
        goto _err;
      }
    } break;
509
    case TDMT_VND_ALTER_CONFIRM:
510
      needCommit = pVnode->config.hashChange;
511
      if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
512 513
        goto _err;
      }
514
      break;
515
    case TDMT_VND_ALTER_CONFIG:
516
      vnodeProcessAlterConfigReq(pVnode, ver, pReq, len, pRsp);
S
Shengliang Guan 已提交
517
      break;
H
Hongze Cheng 已提交
518
    case TDMT_VND_COMMIT:
B
Benguang Zhao 已提交
519 520
      needCommit = true;
      break;
dengyihao's avatar
dengyihao 已提交
521
    case TDMT_VND_CREATE_INDEX:
522
      vnodeProcessCreateIndexReq(pVnode, ver, pReq, len, pRsp);
dengyihao's avatar
dengyihao 已提交
523 524
      break;
    case TDMT_VND_DROP_INDEX:
525
      vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
dengyihao's avatar
dengyihao 已提交
526
      break;
H
Hongze Cheng 已提交
527
    case TDMT_VND_COMPACT:
528
      vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp);
H
Hongze Cheng 已提交
529
      goto _exit;
H
Hongze Cheng 已提交
530
    default:
L
Liu Jicong 已提交
531 532
      vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
      return -1;
H
Hongze Cheng 已提交
533 534
  }

S
Shengliang Guan 已提交
535
  vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code,
536
         ver);
H
Hongze Cheng 已提交
537

538
  walApplyVer(pVnode->pWal, ver);
539

540
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) {
S
Shengliang Guan 已提交
541
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
542 543 544
    return -1;
  }

H
Hongze Cheng 已提交
545
  // commit if need
B
Benguang Zhao 已提交
546
  if (needCommit) {
547
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), ver);
548 549 550 551
    if (vnodeAsyncCommit(pVnode) < 0) {
      vError("vgId:%d, failed to vnode async commit since %s.", TD_VID(pVnode), tstrerror(terrno));
      goto _err;
    }
H
Hongze Cheng 已提交
552 553

    // start a new one
554
    if (vnodeBegin(pVnode) < 0) {
H
Hongze Cheng 已提交
555 556
      vError("vgId:%d, failed to begin vnode since %s.", TD_VID(pVnode), tstrerror(terrno));
      goto _err;
557
    }
H
Hongze Cheng 已提交
558 559
  }

H
Hongze Cheng 已提交
560
_exit:
H
Hongze Cheng 已提交
561
  return 0;
H
Hongze Cheng 已提交
562 563

_err:
564 565
  vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
         tstrerror(terrno), ver);
H
Hongze Cheng 已提交
566
  return -1;
H
Hongze Cheng 已提交
567 568
}

569
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan1121 已提交
570
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
D
dapan1121 已提交
571 572 573
    return 0;
  }

574
  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg, TDMT_SCH_QUERY == pMsg->msgType);
D
dapan1121 已提交
575 576 577
}

int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
578
  vTrace("message in vnode query queue is processing");
H
Hongze Cheng 已提交
579 580 581
  if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME ||
       pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) &&
      !syncIsReadyForRead(pVnode->sync)) {
582
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
583 584 585
    return 0;
  }

586 587 588 589 590
  if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) {
    vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
    return 0;
  }

591
  SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
592 593
  initStorageAPI(&handle.api);

H
Hongze Cheng 已提交
594
  switch (pMsg->msgType) {
D
dapan1121 已提交
595
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
596
    case TDMT_SCH_MERGE_QUERY:
D
dapan1121 已提交
597
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
598
    case TDMT_SCH_QUERY_CONTINUE:
D
dapan1121 已提交
599
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
600 601
    case TDMT_VND_TMQ_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg);
602 603
    case TDMT_VND_TMQ_CONSUME_PUSH:
      return tqProcessPollPush(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
604 605
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
606
      return TSDB_CODE_APP_ERROR;
H
Hongze Cheng 已提交
607 608 609
  }
}

610
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
S
Shengliang Guan 已提交
611
  vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
L
Liu Jicong 已提交
612
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
613
       pMsg->msgType == TDMT_VND_BATCH_META) &&
614
      !syncIsReadyForRead(pVnode->sync)) {
615
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
616 617 618
    return 0;
  }

H
Hongze Cheng 已提交
619
  switch (pMsg->msgType) {
D
dapan1121 已提交
620
    case TDMT_SCH_FETCH:
D
dapan1121 已提交
621
    case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
622
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
623
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
624
      return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
L
Liu Jicong 已提交
625 626
    // case TDMT_SCH_CANCEL_TASK:
    //   return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
627
    case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
628
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
629
    case TDMT_SCH_QUERY_HEARTBEAT:
D
dapan1121 已提交
630
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
631
    case TDMT_VND_TABLE_META:
D
dapan1121 已提交
632
      return vnodeGetTableMeta(pVnode, pMsg, true);
D
dapan1121 已提交
633
    case TDMT_VND_TABLE_CFG:
D
dapan1121 已提交
634 635 636
      return vnodeGetTableCfg(pVnode, pMsg, true);
    case TDMT_VND_BATCH_META:
      return vnodeGetBatchMeta(pVnode, pMsg);
H
Hongze Cheng 已提交
637 638
      //    case TDMT_VND_TMQ_CONSUME:
      //      return tqProcessPollReq(pVnode->pTq, pMsg);
639 640
    case TDMT_VND_TMQ_VG_WALINFO:
      return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
641 642
    case TDMT_VND_TMQ_SEEK:
      return tqProcessSeekReq(pVnode->pTq, pMsg);
643 644 645
    case TDMT_STREAM_TASK_RUN:
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
646
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
647 648
    case TDMT_STREAM_TASK_CHECK:
      return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
649
    case TDMT_STREAM_TASK_DISPATCH_RSP:
L
Liu Jicong 已提交
650
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
651 652
    case TDMT_STREAM_RETRIEVE:
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
653 654
    case TDMT_STREAM_RETRIEVE_RSP:
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
655
    case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE:
L
Liu Jicong 已提交
656 657 658 659 660
      return tqProcessTaskRecover1Req(pVnode->pTq, pMsg);
    case TDMT_STREAM_RECOVER_FINISH:
      return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_RECOVER_FINISH_RSP:
      return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
661 662
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
663
      return TSDB_CODE_APP_ERROR;
H
Hongze Cheng 已提交
664 665 666 667
  }
}

void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
C
Cary Xu 已提交
668
  // blockDebugShowDataBlocks(data, __func__);
669
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
670 671
}

D
dapan1121 已提交
672
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
673 674 675
  if (NULL == pMetaRsp) {
    return;
  }
L
Liu Jicong 已提交
676

D
dapan1121 已提交
677 678 679 680 681 682
  strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
  pMetaRsp->dbId = pVnode->config.dbId;
  pMetaRsp->vgId = TD_VID(pVnode);
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}

H
Hongze Cheng 已提交
683
extern int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now);
684
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
685
  int32_t     code = 0;
S
Shengliang Guan 已提交
686 687
  SVTrimDbReq trimReq = {0};

H
Hongze Cheng 已提交
688 689 690 691
  // decode
  if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
S
Shengliang Guan 已提交
692 693
  }

C
Cary Xu 已提交
694 695
  vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);

H
Hongze Cheng 已提交
696
  // process
H
Hongze Cheng 已提交
697
  vnodeAsyncRentention(pVnode, trimReq.timestamp);
H
Hongze Cheng 已提交
698 699
  tsem_wait(&pVnode->canCommit);
  tsem_post(&pVnode->canCommit);
C
Cary Xu 已提交
700

H
Hongze Cheng 已提交
701 702
_exit:
  return code;
S
Shengliang Guan 已提交
703 704
}

705
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
706 707 708
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
  if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
709 710 711 712 713 714
  SVDropTtlTableReq ttlReq = {0};
  if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto end;
  }

715 716
  vDebug("vgId:%d, drop ttl table req will be processed, time:%" PRId32, pVnode->config.vgId, ttlReq.timestampSec);
  int32_t ret = metaTtlDropTable(pVnode->pMeta, (int64_t)ttlReq.timestampSec * 1000, tbUids);
L
Liu Jicong 已提交
717
  if (ret != 0) {
718 719
    goto end;
  }
L
Liu Jicong 已提交
720
  if (taosArrayGetSize(tbUids) > 0) {
wmmhello's avatar
wmmhello 已提交
721 722
    tqUpdateTbUidList(pVnode->pTq, tbUids, false);
  }
723

724
  vnodeAsyncRentention(pVnode, ttlReq.timestampSec);
H
Hongze Cheng 已提交
725

726 727 728 729 730
end:
  taosArrayDestroy(tbUids);
  return ret;
}

731
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
732
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
733
  SDecoder       coder;
H
Hongze Cheng 已提交
734

H
Hongze Cheng 已提交
735 736 737 738 739 740
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
741
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
742 743

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
744 745
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
746 747
  }

748
  if (metaCreateSTable(pVnode->pMeta, ver, &req) < 0) {
H
Hongze Cheng 已提交
749 750
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
751 752
  }

753
  if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) {
754 755 756
    pRsp->code = terrno;
    goto _err;
  }
C
Cary Xu 已提交
757

H
Hongze Cheng 已提交
758
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
759
  return 0;
H
Hongze Cheng 已提交
760 761

_err:
H
Hongze Cheng 已提交
762
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
763
  return -1;
H
Hongze Cheng 已提交
764 765
}

766
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
767
  SDecoder           decoder = {0};
768
  SEncoder           encoder = {0};
769
  int32_t            rcode = 0;
H
Hongze Cheng 已提交
770 771
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
772 773 774
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
775
  STbUidStore       *pStore = NULL;
776
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
777 778

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
779 780 781
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
782

H
Hongze Cheng 已提交
783
  // decode
H
Hongze Cheng 已提交
784 785
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
786 787 788 789
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
790

H
Hongze Cheng 已提交
791
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
792 793
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
794 795 796 797 798
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
799
  // loop to create table
800
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
801
    pCreateReq = req.pReqs + iReq;
D
dapan1121 已提交
802
    memset(&cRsp, 0, sizeof(cRsp));
H
Hongze Cheng 已提交
803

C
Cary Xu 已提交
804 805 806 807
    if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
      rcode = -1;
      goto _exit;
    }
L
Liu Jicong 已提交
808

wafwerar's avatar
wafwerar 已提交
809 810 811 812 813
    if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
      rcode = -1;
      goto _exit;
    }

H
Hongze Cheng 已提交
814 815 816 817 818 819 820 821 822
    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
823
    if (metaCreateTable(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) {
H
Hongze Cheng 已提交
824 825 826 827 828
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
829
    } else {
H
Hongze Cheng 已提交
830
      cRsp.code = TSDB_CODE_SUCCESS;
831
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
832
      taosArrayPush(tbUids, &pCreateReq->uid);
L
Liu Jicong 已提交
833
      vnodeUpdateMetaRsp(pVnode, cRsp.pMeta);
H
Hongze Cheng 已提交
834
    }
H
Hongze Cheng 已提交
835 836

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
837 838
  }

H
Haojun Liao 已提交
839
  vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
840
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
841
  if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
C
Cary Xu 已提交
842 843
    goto _exit;
  }
844
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
845

H
Hongze Cheng 已提交
846
  // prepare rsp
847
  int32_t ret = 0;
wafwerar's avatar
wafwerar 已提交
848
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
849 850 851 852 853 854
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
855 856
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
H
Hongze Cheng 已提交
857

H
Hongze Cheng 已提交
858
_exit:
wmmhello's avatar
wmmhello 已提交
859 860
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
861
    taosMemoryFree(pCreateReq->comment);
wmmhello's avatar
wmmhello 已提交
862 863
    taosArrayDestroy(pCreateReq->ctb.tagName);
  }
864
  taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
865
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
866 867
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
868
  return rcode;
H
Hongze Cheng 已提交
869 870
}

871
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
872 873 874 875 876 877 878 879 880 881 882 883 884 885 886
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
887
  }
H
Hongze Cheng 已提交
888

889
  if (metaAlterSTable(pVnode->pMeta, ver, &req) < 0) {
H
Hongze Cheng 已提交
890 891 892 893 894 895 896
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
897 898 899
  return 0;
}

900
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
901
  SVDropStbReq req = {0};
902
  int32_t      rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
903
  SDecoder     decoder = {0};
904
  SArray      *tbUidList = NULL;
H
Hongze Cheng 已提交
905 906 907 908 909 910

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
911 912
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
913 914 915 916 917
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
918 919
  tbUidList = taosArrayInit(8, sizeof(int64_t));
  if (tbUidList == NULL) goto _exit;
920
  if (metaDropSTable(pVnode->pMeta, ver, &req, tbUidList) < 0) {
921 922 923 924 925
    rcode = terrno;
    goto _exit;
  }

  if (tqUpdateTbUidList(pVnode->pTq, tbUidList, false) < 0) {
926 927 928
    rcode = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
929

930 931 932 933 934
  if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
935 936
  // return rsp
_exit:
937
  if (tbUidList) taosArrayDestroy(tbUidList);
H
Hongze Cheng 已提交
938
  pRsp->code = rcode;
H
Hongze Cheng 已提交
939
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
940 941 942
  return 0;
}

943
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
D
dapan1121 已提交
944 945 946
  SVAlterTbReq  vAlterTbReq = {0};
  SVAlterTbRsp  vAlterTbRsp = {0};
  SDecoder      dc = {0};
947 948
  int32_t       rcode = 0;
  int32_t       ret;
D
dapan1121 已提交
949 950
  SEncoder      ec = {0};
  STableMetaRsp vMetaRsp = {0};
H
Hongze Cheng 已提交
951 952 953 954 955 956 957 958 959 960

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
961
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
962
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
963 964
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
965 966 967
  }

  // process
968
  if (metaAlterTable(pVnode->pMeta, ver, &vAlterTbReq, &vMetaRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
969
    vAlterTbRsp.code = terrno;
H
Hongze Cheng 已提交
970
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
971 972
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
973 974
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
975

D
dapan1121 已提交
976 977 978 979 980
  if (NULL != vMetaRsp.pSchemas) {
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
    vAlterTbRsp.pMeta = &vMetaRsp;
  }

H
Hongze Cheng 已提交
981 982 983 984 985 986
_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
M
Minglei Jin 已提交
987 988 989
  if (vMetaRsp.pSchemas) {
    taosMemoryFree(vMetaRsp.pSchemas);
  }
H
Hongze Cheng 已提交
990 991 992
  return 0;
}

993
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
994 995
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
996
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
997
  SEncoder         encoder = {0};
998
  int32_t          ret;
999
  SArray          *tbUids = NULL;
1000
  STbUidStore     *pStore = NULL;
H
Hongze Cheng 已提交
1001

H
Hongze Cheng 已提交
1002
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
1003 1004 1005
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
1006 1007

  // decode req
H
Hongze Cheng 已提交
1008 1009
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
1010 1011 1012 1013 1014
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
1015 1016

  // process req
1017
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
1018
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
1019 1020
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

1021
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
1022 1023
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
1024
    tb_uid_t     tbUid = 0;
H
Hongze Cheng 已提交
1025

H
Hongze Cheng 已提交
1026
    /* code */
1027
    ret = metaDropTable(pVnode->pMeta, ver, pDropTbReq, tbUids, &tbUid);
H
Hongze Cheng 已提交
1028
    if (ret < 0) {
1029
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
H
Hongze Cheng 已提交
1030 1031 1032 1033
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
1034
    } else {
H
Hongze Cheng 已提交
1035
      dropTbRsp.code = TSDB_CODE_SUCCESS;
1036
      if (tbUid > 0) tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid);
H
Hongze Cheng 已提交
1037 1038 1039 1040 1041
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

1042
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);
1043
  tdUpdateTbUidList(pVnode->pSma, pStore, false);
1044

H
Hongze Cheng 已提交
1045
_exit:
1046
  taosArrayDestroy(tbUids);
1047
  tdUidStoreFree(pStore);
H
Hongze Cheng 已提交
1048
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
1049 1050 1051 1052 1053
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
1054
  taosArrayDestroy(rsp.pArray);
H
Hongze Cheng 已提交
1055 1056 1057
  return 0;
}

1058 1059
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
                                              const char *tags) {
D
dapan 已提交
1060 1061 1062 1063
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;
C
Cary Xu 已提交
1064
  int32_t        rv = -1;
D
dapan 已提交
1065 1066 1067

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
1068 1069 1070 1071 1072

  pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row), 1);  // TODO: use the real schema
  if (pSchema) {
    suid = msgIter->suid;
    rv = TD_ROW_SVER(blkIter.row);
D
dapan 已提交
1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
typedef struct SSubmitReqConvertCxt {
  SSubmitMsgIter msgIter;
  SSubmitBlk    *pBlock;
  SSubmitBlkIter blkIter;
  STSRow        *pRow;
  STSRowIter     rowIter;
  SSubmitTbData *pTbData;
  STSchema      *pTbSchema;
  SArray        *pColValues;
} SSubmitReqConvertCxt;

static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) {
  taosMemoryFreeClear(pCxt->pTbSchema);
1102 1103
  pCxt->pTbSchema = metaGetTbTSchema(pMeta, pCxt->msgIter.suid > 0 ? pCxt->msgIter.suid : pCxt->msgIter.uid,
                                     pCxt->msgIter.sversion, 1);
1104 1105 1106 1107 1108
  if (NULL == pCxt->pTbSchema) {
    return TSDB_CODE_INVALID_MSG;
  }
  tdSTSRowIterInit(&pCxt->rowIter, pCxt->pTbSchema);

1109
  tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140
  if (NULL == pCxt->pTbData) {
    pCxt->pTbData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
    if (NULL == pCxt->pTbData) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  pCxt->pTbData->flags = 0;
  pCxt->pTbData->suid = pCxt->msgIter.suid;
  pCxt->pTbData->uid = pCxt->msgIter.uid;
  pCxt->pTbData->sver = pCxt->msgIter.sversion;
  pCxt->pTbData->pCreateTbReq = NULL;
  pCxt->pTbData->aRowP = taosArrayInit(128, POINTER_BYTES);
  if (NULL == pCxt->pTbData->aRowP) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  taosArrayDestroy(pCxt->pColValues);
  pCxt->pColValues = taosArrayInit(pCxt->pTbSchema->numOfCols, sizeof(SColVal));
  if (NULL == pCxt->pColValues) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for (int32_t i = 0; i < pCxt->pTbSchema->numOfCols; ++i) {
    SColVal val = COL_VAL_NONE(pCxt->pTbSchema->columns[i].colId, pCxt->pTbSchema->columns[i].type);
    taosArrayPush(pCxt->pColValues, &val);
  }

  return TSDB_CODE_SUCCESS;
}

static void vnodeDestroySubmitReqConvertCxt(SSubmitReqConvertCxt *pCxt) {
  taosMemoryFreeClear(pCxt->pTbSchema);
1141
  tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158
  taosMemoryFreeClear(pCxt->pTbData);
  taosArrayDestroy(pCxt->pColValues);
}

static int32_t vnodeCellValConvertToColVal(STColumn *pCol, SCellVal *pCellVal, SColVal *pColVal) {
  if (tdValTypeIsNone(pCellVal->valType)) {
    pColVal->flag = CV_FLAG_NONE;
    return TSDB_CODE_SUCCESS;
  }

  if (tdValTypeIsNull(pCellVal->valType)) {
    pColVal->flag = CV_FLAG_NULL;
    return TSDB_CODE_SUCCESS;
  }

  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pColVal->value.nData = varDataLen(pCellVal->val);
1159
    pColVal->value.pData = (uint8_t *)varDataVal(pCellVal->val);
1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197
  } else if (TSDB_DATA_TYPE_FLOAT == pCol->type) {
    float f = GET_FLOAT_VAL(pCellVal->val);
    memcpy(&pColVal->value.val, &f, sizeof(f));
  } else if (TSDB_DATA_TYPE_DOUBLE == pCol->type) {
    pColVal->value.val = *(int64_t *)pCellVal->val;
  } else {
    GET_TYPED_DATA(pColVal->value.val, int64_t, pCol->type, pCellVal->val);
  }

  pColVal->flag = CV_FLAG_VALUE;
  return TSDB_CODE_SUCCESS;
}

static int32_t vnodeTSRowConvertToColValArray(SSubmitReqConvertCxt *pCxt) {
  int32_t code = TSDB_CODE_SUCCESS;
  tdSTSRowIterReset(&pCxt->rowIter, pCxt->pRow);
  for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < pCxt->pTbSchema->numOfCols; ++i) {
    STColumn *pCol = pCxt->pTbSchema->columns + i;
    SCellVal  cellVal = {0};
    if (!tdSTSRowIterFetch(&pCxt->rowIter, pCol->colId, pCol->type, &cellVal)) {
      break;
    }
    code = vnodeCellValConvertToColVal(pCol, &cellVal, (SColVal *)taosArrayGet(pCxt->pColValues, i));
  }
  return code;
}

static int32_t vnodeDecodeCreateTbReq(SSubmitReqConvertCxt *pCxt) {
  if (pCxt->msgIter.schemaLen <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  pCxt->pTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
  if (NULL == pCxt->pTbData->pCreateTbReq) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SDecoder decoder = {0};
1198
  tDecoderInit(&decoder, (uint8_t *)pCxt->pBlock->data, pCxt->msgIter.schemaLen);
1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250
  int32_t code = tDecodeSVCreateTbReq(&decoder, pCxt->pTbData->pCreateTbReq);
  tDecoderClear(&decoder);

  return code;
}

static int32_t vnodeSubmitReqConvertToSubmitReq2(SVnode *pVnode, SSubmitReq *pReq, SSubmitReq2 *pReq2) {
  pReq2->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
  if (NULL == pReq2->aSubmitTbData) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSubmitReqConvertCxt cxt = {0};

  int32_t code = tInitSubmitMsgIter(pReq, &cxt.msgIter);
  while (TSDB_CODE_SUCCESS == code) {
    code = tGetSubmitMsgNext(&cxt.msgIter, &cxt.pBlock);
    if (TSDB_CODE_SUCCESS == code) {
      if (NULL == cxt.pBlock) {
        break;
      }
      code = vnodeResetTableCxt(pVnode->pMeta, &cxt);
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = tInitSubmitBlkIter(&cxt.msgIter, cxt.pBlock, &cxt.blkIter);
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = vnodeDecodeCreateTbReq(&cxt);
    }
    while (TSDB_CODE_SUCCESS == code && (cxt.pRow = tGetSubmitBlkNext(&cxt.blkIter)) != NULL) {
      code = vnodeTSRowConvertToColValArray(&cxt);
      if (TSDB_CODE_SUCCESS == code) {
        SRow **pNewRow = taosArrayReserve(cxt.pTbData->aRowP, 1);
        code = tRowBuild(cxt.pColValues, cxt.pTbSchema, pNewRow);
      }
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = (NULL == taosArrayPush(pReq2->aSubmitTbData, cxt.pTbData) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);
    }
    if (TSDB_CODE_SUCCESS == code) {
      taosMemoryFreeClear(cxt.pTbData);
    }
  }

  vnodeDestroySubmitReqConvertCxt(&cxt);
  return code;
}

static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
  int32_t  code = TSDB_CODE_SUCCESS;
  char    *pMsg = NULL;
  uint32_t msglen = 0;
1251
  tEncodeSize(tEncodeSubmitReq, pSubmitReq, msglen, code);
1252 1253 1254 1255 1256 1257 1258 1259
  if (TSDB_CODE_SUCCESS == code) {
    pMsg = taosMemoryMalloc(msglen);
    if (NULL == pMsg) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    SEncoder encoder;
1260
    tEncoderInit(&encoder, (uint8_t *)pMsg, msglen);
1261
    code = tEncodeSubmitReq(&encoder, pSubmitReq);
1262 1263 1264 1265 1266 1267 1268 1269
    tEncoderClear(&encoder);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *ppMsg = pMsg;
  }
  return code;
}

1270
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
1271
  int32_t code = 0;
D
dapan1121 已提交
1272
  terrno = 0;
C
Cary Xu 已提交
1273

H
Hongze Cheng 已提交
1274
  SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
H
Hongze Cheng 已提交
1275
  SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0};
H
Hongze Cheng 已提交
1276
  SArray      *newTbUids = NULL;
H
Hongze Cheng 已提交
1277 1278 1279 1280
  int32_t      ret;
  SEncoder     ec = {0};

  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
1281

1282
  void           *pAllocMsg = NULL;
1283
  SSubmitReq2Msg *pMsg = (SSubmitReq2Msg *)pReq;
1284
  if (0 == pMsg->version) {
1285 1286 1287 1288
    code = vnodeSubmitReqConvertToSubmitReq2(pVnode, (SSubmitReq *)pMsg, pSubmitReq);
    if (TSDB_CODE_SUCCESS == code) {
      code = vnodeRebuildSubmitReqMsg(pSubmitReq, &pReq);
    }
1289 1290 1291
    if (TSDB_CODE_SUCCESS == code) {
      pAllocMsg = pReq;
    }
1292 1293 1294 1295 1296 1297 1298 1299 1300
    if (TSDB_CODE_SUCCESS != code) {
      goto _exit;
    }
  } else {
    // decode
    pReq = POINTER_SHIFT(pReq, sizeof(SSubmitReq2Msg));
    len -= sizeof(SSubmitReq2Msg);
    SDecoder dc = {0};
    tDecoderInit(&dc, pReq, len);
1301
    if (tDecodeSubmitReq(&dc, pSubmitReq) < 0) {
1302 1303 1304 1305
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }
    tDecoderClear(&dc);
D
dapan 已提交
1306
  }
C
Cary Xu 已提交
1307

H
Hongze Cheng 已提交
1308 1309 1310 1311 1312 1313 1314
  // scan
  TSKEY now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
  TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
  TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

H
Hongze Cheng 已提交
1315 1316 1317 1318 1319
    if (pSubmitTbData->pCreateTbReq && pSubmitTbData->pCreateTbReq->uid == 0) {
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }

H
Hongze Cheng 已提交
1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331
    if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
      if (TARRAY_SIZE(pSubmitTbData->aCol) <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      SColData *pColData = (SColData *)taosArrayGet(pSubmitTbData->aCol, 0);
      TSKEY    *aKey = (TSKEY *)(pColData->pData);

      for (int32_t iRow = 0; iRow < pColData->nVal; iRow++) {
        if (aKey[iRow] < minKey || aKey[iRow] > maxKey || (iRow > 0 && aKey[iRow] <= aKey[iRow - 1])) {
          code = TSDB_CODE_INVALID_MSG;
1332
          vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
H
Hongze Cheng 已提交
1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343
          goto _exit;
        }
      }

    } else {
      int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
      SRow  **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);

      for (int32_t iRow = 0; iRow < nRow; ++iRow) {
        if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) {
          code = TSDB_CODE_INVALID_MSG;
1344
          vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
H
Hongze Cheng 已提交
1345 1346 1347 1348 1349 1350
          goto _exit;
        }
      }
    }
  }

H
Hongze Cheng 已提交
1351
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
H
Hongze Cheng 已提交
1352 1353 1354 1355 1356 1357 1358 1359 1360 1361
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

    if (pSubmitTbData->pCreateTbReq) {
      pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid;
    } else {
      SMetaInfo info = {0};

      code = metaGetInfo(pVnode->pMeta, pSubmitTbData->uid, &info, NULL);
      if (code) {
        code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
D
dapan1121 已提交
1362
        vWarn("vgId:%d, table uid:%" PRId64 " not exists", TD_VID(pVnode), pSubmitTbData->uid);
H
Hongze Cheng 已提交
1363 1364 1365 1366 1367 1368 1369 1370 1371
        goto _exit;
      }

      if (info.suid != pSubmitTbData->suid) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      if (info.suid) {
H
Hongze Cheng 已提交
1372 1373
        code = metaGetInfo(pVnode->pMeta, info.suid, &info, NULL);
        ASSERT(code == 0);
H
Hongze Cheng 已提交
1374 1375 1376 1377 1378 1379 1380
      }

      if (pSubmitTbData->sver != info.skmVer) {
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
        goto _exit;
      }
    }
H
Hongze Cheng 已提交
1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396

    if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
      int32_t   nColData = TARRAY_SIZE(pSubmitTbData->aCol);
      SColData *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);

      if (nColData <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      if (aColData[0].cid != PRIMARYKEY_TIMESTAMP_COL_ID || aColData[0].type != TSDB_DATA_TYPE_TIMESTAMP ||
          aColData[0].nVal <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

1397 1398
      for (int32_t j = 1; j < nColData; j++) {
        if (aColData[j].nVal != aColData[0].nVal) {
H
Hongze Cheng 已提交
1399 1400 1401 1402 1403
          code = TSDB_CODE_INVALID_MSG;
          goto _exit;
        }
      }
    }
H
Hongze Cheng 已提交
1404 1405
  }

L
Liu Jicong 已提交
1406 1407
  vDebug("vgId:%d, submit block size %d", TD_VID(pVnode), (int32_t)taosArrayGetSize(pSubmitReq->aSubmitTbData));

H
Hongze Cheng 已提交
1408
  // loop to handle
H
Hongze Cheng 已提交
1409
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
H
Hongze Cheng 已提交
1410 1411 1412 1413
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

    // create table
    if (pSubmitTbData->pCreateTbReq) {
H
Hongze Cheng 已提交
1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424
      // check (TODO: move check to create table)
      code = grantCheck(TSDB_GRANT_TIMESERIES);
      if (code) goto _exit;

      code = grantCheck(TSDB_GRANT_TABLE);
      if (code) goto _exit;

      // alloc if need
      if (pSubmitRsp->aCreateTbRsp == NULL &&
          (pSubmitRsp->aCreateTbRsp = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(SVCreateTbRsp))) ==
              NULL) {
H
Hongze Cheng 已提交
1425
        code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1426 1427 1428 1429 1430 1431
        goto _exit;
      }

      SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);

      // create table
1432
      if (metaCreateTable(pVnode->pMeta, ver, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) {
1433
        // create table success
H
Hongze Cheng 已提交
1434 1435 1436

        if (newTbUids == NULL &&
            (newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) {
H
Hongze Cheng 已提交
1437
          code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1438 1439 1440 1441 1442 1443
          goto _exit;
        }

        taosArrayPush(newTbUids, &pSubmitTbData->uid);

        if (pCreateTbRsp->pMeta) {
D
dapan1121 已提交
1444
          vnodeUpdateMetaRsp(pVnode, pCreateTbRsp->pMeta);
H
Hongze Cheng 已提交
1445 1446 1447 1448 1449 1450
        }
      } else {  // create table failed
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
          code = terrno;
          goto _exit;
        }
X
Xiaoyu Wang 已提交
1451
        terrno = 0;
1452
        pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid;  // update uid if table exist for using below
H
Hongze Cheng 已提交
1453
      }
H
Hongze Cheng 已提交
1454 1455 1456
    }

    // insert data
H
Hongze Cheng 已提交
1457
    int32_t affectedRows;
1458
    code = tsdbInsertTableData(pVnode->pTsdb, ver, pSubmitTbData, &affectedRows);
H
Hongze Cheng 已提交
1459 1460
    if (code) goto _exit;

1461 1462 1463
    code = metaUpdateChangeTime(pVnode->pMeta, pSubmitTbData->uid, pSubmitTbData->ctimeMs);
    if (code) goto _exit;

H
Hongze Cheng 已提交
1464
    pSubmitRsp->affectedRows += affectedRows;
H
Hongze Cheng 已提交
1465
  }
H
Hongze Cheng 已提交
1466

1467
  // update the affected table uid list
H
Hongze Cheng 已提交
1468 1469 1470 1471
  if (taosArrayGetSize(newTbUids) > 0) {
    vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
           (int32_t)taosArrayGetSize(newTbUids));
    tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
H
Hongze Cheng 已提交
1472
  }
H
Hongze Cheng 已提交
1473 1474 1475 1476 1477 1478 1479 1480 1481 1482

_exit:
  // message
  pRsp->code = code;
  tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSSubmitRsp2(&ec, pSubmitRsp);
  tEncoderClear(&ec);

H
Hongze Cheng 已提交
1483 1484 1485 1486 1487 1488
  // update statistics
  atomic_add_fetch_64(&pVnode->statis.nInsert, pSubmitRsp->affectedRows);
  atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows);
  atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
  if (code == 0) {
    atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
1489
    tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
H
Hongze Cheng 已提交
1490 1491
  }

H
Hongze Cheng 已提交
1492 1493
  // clear
  taosArrayDestroy(newTbUids);
1494
  tDestroySubmitReq(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE);
H
Hongze Cheng 已提交
1495
  tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
H
Hongze Cheng 已提交
1496

D
dapan1121 已提交
1497 1498
  if (code) terrno = code;

1499
  taosMemoryFree(pAllocMsg);
1500

H
Hongze Cheng 已提交
1501
  return code;
L
Liu Jicong 已提交
1502
}
1503

1504
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1505
  SVCreateTSmaReq req = {0};
C
Cary Xu 已提交
1506
  SDecoder        coder = {0};
1507

C
Cary Xu 已提交
1508 1509 1510 1511 1512 1513
  if (pRsp) {
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
    pRsp->code = TSDB_CODE_SUCCESS;
    pRsp->pCont = NULL;
    pRsp->contLen = 0;
  }
1514 1515 1516 1517 1518

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
C
Cary Xu 已提交
1519 1520
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
    if (pRsp) pRsp->code = terrno;
1521
    goto _err;
1522
  }
C
Cary Xu 已提交
1523

1524
  if (tdProcessTSmaCreate(pVnode->pSma, ver, (const char *)&req) < 0) {
C
Cary Xu 已提交
1525
    if (pRsp) pRsp->code = terrno;
1526
    goto _err;
1527
  }
C
Cary Xu 已提交
1528

1529
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
1530
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
1531
         req.indexName, req.indexUid, ver, req.tableUid);
H
Hongze Cheng 已提交
1532
  return 0;
1533 1534 1535

_err:
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
1536
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
1537
         TD_VID(pVnode), req.indexName, req.indexUid, ver, req.tableUid, terrstr());
1538
  return -1;
L
Liu Jicong 已提交
1539
}
C
Cary Xu 已提交
1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551

/**
 * @brief specific for smaDstVnode
 *
 * @param pVnode
 * @param pCont
 * @param contLen
 * @return int32_t
 */
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
1552

1553
static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) {
B
Benguang Zhao 已提交
1554 1555 1556
  int32_t code = TSDB_CODE_SUCCESS;

  vInfo("vgId:%d, trim meta of tables per hash range [%" PRIu32 ", %" PRIu32 "]. apply-index:%" PRId64, TD_VID(pVnode),
1557
        pVnode->config.hashBegin, pVnode->config.hashEnd, ver);
B
Benguang Zhao 已提交
1558 1559

  // TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
M
Minglei Jin 已提交
1560
  code = metaTrimTables(pVnode->pMeta);
B
Benguang Zhao 已提交
1561 1562 1563 1564

  return code;
}

1565
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
B
Benguang Zhao 已提交
1566 1567 1568 1569 1570 1571
  vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confim msg is processed", TD_VID(pVnode));
  int32_t code = TSDB_CODE_SUCCESS;
  if (!pVnode->config.hashChange) {
    goto _exit;
  }

1572
  code = vnodeConsolidateAlterHashRange(pVnode, ver);
1573
  if (code < 0) {
1574
    vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(), ver);
1575 1576
    goto _exit;
  }
B
Benguang Zhao 已提交
1577 1578 1579
  pVnode->config.hashChange = false;

_exit:
1580
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
B
Benguang Zhao 已提交
1581
  pRsp->code = code;
1582 1583 1584
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

1585
  return code;
1586
}
S
Shengliang Guan 已提交
1587

1588
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
1589 1590
  bool walChanged = false;
  bool tsdbChanged = false;
1591

S
Shengliang Guan 已提交
1592 1593
  SAlterVnodeConfigReq req = {0};
  if (tDeserializeSAlterVnodeConfigReq(pReq, len, &req) != 0) {
1594 1595 1596 1597
    terrno = TSDB_CODE_INVALID_MSG;
    return TSDB_CODE_INVALID_MSG;
  }

1598
  vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
1599 1600
        " cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d walRetentionPeriod:%d "
        "walRetentionSize:%d",
1601 1602
        TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
        req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
1603
        req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize);
1604 1605 1606

  if (pVnode->config.cacheLastSize != req.cacheLastSize) {
    pVnode->config.cacheLastSize = req.cacheLastSize;
1607 1608
    tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
  }
1609

1610
  if (pVnode->config.szBuf != req.buffer * 1024LL * 1024LL) {
S
Shengliang Guan 已提交
1611
    vInfo("vgId:%d, vnode buffer is changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pVnode->config.szBuf,
1612
          (uint64_t)(req.buffer * 1024LL * 1024LL));
1613
    pVnode->config.szBuf = req.buffer * 1024LL * 1024LL;
H
Hongze Cheng 已提交
1614 1615
  }

1616 1617
  if (pVnode->config.szCache != req.pages) {
    if (metaAlterCache(pVnode->pMeta, req.pages) < 0) {
S
Shengliang Guan 已提交
1618
      vError("vgId:%d, failed to change vnode pages from %d to %d failed since %s", TD_VID(pVnode),
1619
             pVnode->config.szCache, req.pages, tstrerror(errno));
H
Hongze Cheng 已提交
1620 1621
      return errno;
    } else {
S
Shengliang Guan 已提交
1622
      vInfo("vgId:%d, vnode pages is changed from %d to %d", TD_VID(pVnode), pVnode->config.szCache, req.pages);
1623
      pVnode->config.szCache = req.pages;
H
Hongze Cheng 已提交
1624
    }
H
Hongze Cheng 已提交
1625 1626
  }

1627 1628
  if (pVnode->config.cacheLast != req.cacheLast) {
    pVnode->config.cacheLast = req.cacheLast;
1629 1630
  }

1631 1632
  if (pVnode->config.walCfg.fsyncPeriod != req.walFsyncPeriod) {
    pVnode->config.walCfg.fsyncPeriod = req.walFsyncPeriod;
1633 1634 1635
    walChanged = true;
  }

1636 1637
  if (pVnode->config.walCfg.level != req.walLevel) {
    pVnode->config.walCfg.level = req.walLevel;
1638 1639 1640 1641 1642 1643 1644
    walChanged = true;
  }

  if (pVnode->config.walCfg.retentionPeriod != req.walRetentionPeriod) {
    pVnode->config.walCfg.retentionPeriod = req.walRetentionPeriod;
    walChanged = true;
  }
1645

1646 1647
  if (pVnode->config.walCfg.retentionSize != req.walRetentionSize) {
    pVnode->config.walCfg.retentionSize = req.walRetentionSize;
1648 1649 1650
    walChanged = true;
  }

1651 1652
  if (pVnode->config.tsdbCfg.keep0 != req.daysToKeep0) {
    pVnode->config.tsdbCfg.keep0 = req.daysToKeep0;
1653
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1654
      tsdbChanged = true;
1655 1656 1657
    }
  }

1658 1659
  if (pVnode->config.tsdbCfg.keep1 != req.daysToKeep1) {
    pVnode->config.tsdbCfg.keep1 = req.daysToKeep1;
1660
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1661
      tsdbChanged = true;
1662 1663 1664
    }
  }

1665 1666
  if (pVnode->config.tsdbCfg.keep2 != req.daysToKeep2) {
    pVnode->config.tsdbCfg.keep2 = req.daysToKeep2;
1667
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1668
      tsdbChanged = true;
1669 1670 1671
    }
  }

1672 1673 1674 1675 1676 1677 1678 1679
  if (req.sttTrigger != -1 && req.sttTrigger != pVnode->config.sttTrigger) {
    pVnode->config.sttTrigger = req.sttTrigger;
  }

  if (req.minRows != -1 && req.minRows != pVnode->config.tsdbCfg.minRows) {
    pVnode->config.tsdbCfg.minRows = req.minRows;
  }

1680
  if (walChanged) {
M
Minglei Jin 已提交
1681 1682 1683 1684 1685
    walAlter(pVnode->pWal, &pVnode->config.walCfg);
  }

  if (tsdbChanged) {
    tsdbSetKeepCfg(pVnode->pTsdb, &pVnode->config.tsdbCfg);
1686 1687
  }

1688 1689 1690
  return 0;
}

1691
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1692 1693 1694 1695 1696
  SBatchDeleteReq deleteReq;
  SDecoder        decoder;
  tDecoderInit(&decoder, pReq, len);
  tDecodeSBatchDeleteReq(&decoder, &deleteReq);

1697
  SMetaReader mr = {0};
1698
  metaReaderInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
1699

1700 1701 1702
  int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
  for (int32_t i = 0; i < sz; i++) {
    SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i);
1703 1704
    char             *name = pOneReq->tbname;
    if (metaGetTableEntryByName(&mr, name) < 0) {
S
Shengliang Guan 已提交
1705
      vDebug("vgId:%d, stream delete msg, skip since no table: %s", pVnode->config.vgId, name);
1706 1707 1708 1709 1710
      continue;
    }

    int64_t uid = mr.me.uid;

1711
    int32_t code = tsdbDeleteTableData(pVnode->pTsdb, ver, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
1712 1713 1714
    if (code < 0) {
      terrno = code;
      vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
L
Liu Jicong 已提交
1715
             TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
1716
    }
1717

1718 1719 1720 1721 1722 1723 1724 1725
    code = metaUpdateChangeTime(pVnode->pMeta, uid, deleteReq.ctimeMs);
    if (code < 0) {
      terrno = code;
      vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64
             ", end ts:%" PRId64,
             TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
    }

1726
    tDecoderClear(&mr.coder);
1727
  }
1728
  metaReaderClear(&mr);
1729
  taosArrayDestroy(deleteReq.deleteReqs);
1730 1731 1732
  return 0;
}

1733
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
1734 1735 1736 1737
  int32_t     code = 0;
  SDecoder   *pCoder = &(SDecoder){0};
  SDeleteRes *pRes = &(SDeleteRes){0};

wmmhello's avatar
wmmhello 已提交
1738 1739 1740 1741 1742
  pRsp->msgType = TDMT_VND_DELETE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

H
Hongze Cheng 已提交
1743 1744 1745 1746 1747 1748 1749 1750
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
1751
  ASSERT(taosArrayGetSize(pRes->uidList) == 0 || (pRes->skey != 0 && pRes->ekey != 0));
H
Hongze Cheng 已提交
1752 1753

  for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
1754 1755 1756 1757
    uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid);
    code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey);
    if (code) goto _err;
    code = metaUpdateChangeTime(pVnode->pMeta, uid, pRes->ctimeMs);
H
Hongze Cheng 已提交
1758 1759 1760 1761 1762
    if (code) goto _err;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pRes->uidList);
wmmhello's avatar
wmmhello 已提交
1763 1764

  SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows};
L
Liu Jicong 已提交
1765
  int32_t     ret = 0;
wmmhello's avatar
wmmhello 已提交
1766 1767
  tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
L
Liu Jicong 已提交
1768
  SEncoder ec = {0};
wmmhello's avatar
wmmhello 已提交
1769 1770 1771
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVDeleteRsp(&ec, &rsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
1772 1773 1774 1775
  return code;

_err:
  return code;
M
Minglei Jin 已提交
1776
}
1777
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
dengyihao's avatar
dengyihao 已提交
1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_CREATE_INDEX_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);
  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
  }
1793
  if (metaAddIndexToSTable(pVnode->pMeta, ver, &req) < 0) {
dengyihao's avatar
dengyihao 已提交
1794 1795 1796 1797 1798 1799 1800 1801
    pRsp->code = terrno;
    goto _err;
  }
  tDecoderClear(&dc);
  return 0;
_err:
  tDecoderClear(&dc);
  return -1;
dengyihao's avatar
dengyihao 已提交
1802
}
1803
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
dengyihao's avatar
dengyihao 已提交
1804
  SDropIndexReq req = {0};
dengyihao's avatar
dengyihao 已提交
1805
  pRsp->msgType = TDMT_VND_DROP_INDEX_RSP;
dengyihao's avatar
dengyihao 已提交
1806 1807 1808 1809 1810 1811 1812 1813
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  if (tDeserializeSDropIdxReq(pReq, len, &req)) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1814

1815
  if (metaDropIndexFromSTable(pVnode->pMeta, ver, &req) < 0) {
dengyihao's avatar
dengyihao 已提交
1816 1817 1818
    pRsp->code = terrno;
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1819 1820
  return TSDB_CODE_SUCCESS;
}
1821

1822
extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
1823

1824 1825
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
  return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp);
H
Hongze Cheng 已提交
1826
}
1827

H
Hongze Cheng 已提交
1828
#ifndef TD_ENTERPRISE
1829
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1830
  return 0;
dengyihao's avatar
dengyihao 已提交
1831
}
H
Hongze Cheng 已提交
1832
#endif