vnodeSvr.c 57.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16 17
#include "tencode.h"
#include "tmsg.h"
H
Hongze Cheng 已提交
18
#include "vnd.h"
H
Hongze Cheng 已提交
19 20
#include "vnode.h"
#include "vnodeInt.h"
H
Hongze Cheng 已提交
21

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
39

40
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t btime, int64_t *pUid) {
H
Hongze Cheng 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
  int32_t code = 0;
  int32_t lino = 0;

  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // flags
  if (tDecodeI32v(pCoder, NULL) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // name
  char *name = NULL;
  if (tDecodeCStr(pCoder, &name) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // uid
  int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
  if (uid == 0) {
    uid = tGenIdPI64();
  }
  *(int64_t *)(pCoder->data + pCoder->pos) = uid;

69 70
  // btime
  *(int64_t *)(pCoder->data + pCoder->pos + 8) = btime;
H
Hongze Cheng 已提交
71 72 73 74 75 76 77 78

  tEndDecode(pCoder);

_exit:
  if (code) {
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  } else {
    vTrace("vgId:%d %s done, table:%s uid generated:%" PRId64, TD_VID(pVnode), __func__, name, uid);
H
Hongze Cheng 已提交
79
    if (pUid) *pUid = uid;
H
Hongze Cheng 已提交
80 81 82
  }
  return code;
}
H
Hongze Cheng 已提交
83 84 85 86
static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;
  int32_t lino = 0;

87
  int64_t  btime = taosGetTimestampMs();
H
Hongze Cheng 已提交
88
  SDecoder dc = {0};
H
Hongze Cheng 已提交
89
  int32_t  nReqs;
H
Hongze Cheng 已提交
90

H
Hongze Cheng 已提交
91 92 93 94 95 96 97 98 99 100 101
  tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
  if (tStartDecode(&dc) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    return code;
  }

  if (tDecodeI32v(&dc, &nReqs) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  for (int32_t iReq = 0; iReq < nReqs; iReq++) {
102
    code = vnodePreprocessCreateTableReq(pVnode, &dc, btime, NULL);
H
Hongze Cheng 已提交
103
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
104 105 106 107 108 109
  }

  tEndDecode(&dc);

_exit:
  tDecoderClear(&dc);
D
dapan1121 已提交
110 111 112 113
  if (code) {
    vError("vgId:%d, %s:%d failed to preprocess submit request since %s, msg type:%s", TD_VID(pVnode), __func__, lino,
           tstrerror(code), TMSG_INFO(pMsg->msgType));
  }
H
Hongze Cheng 已提交
114 115
  return code;
}
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142

static int32_t vnodePreProcessAlterTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = TSDB_CODE_INVALID_MSG;
  int32_t lino = 0;

  SDecoder dc = {0};
  tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));

  SVAlterTbReq vAlterTbReq = {0};
  int64_t      ctimeMs = taosGetTimestampMs();
  if (tDecodeSVAlterTbReqSetCtime(&dc, &vAlterTbReq, ctimeMs) < 0) {
    goto _exit;
  }

  code = 0;

_exit:
  tDecoderClear(&dc);
  if (code) {
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  } else {
    vTrace("vgId:%d %s done, table:%s ctimeMs generated:%" PRId64, TD_VID(pVnode), __func__, vAlterTbReq.tbName,
           ctimeMs);
  }
  return code;
}

H
Hongze Cheng 已提交
143
extern int64_t tsMaxKeyByPrecision[];
144
static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t btimeMs, int64_t ctimeMs) {
H
Hongze Cheng 已提交
145 146 147
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
148 149 150 151
  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
152

H
Hongze Cheng 已提交
153 154 155 156 157
  SSubmitTbData submitTbData;
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
158

H
Hongze Cheng 已提交
159
  int64_t uid;
H
Hongze Cheng 已提交
160
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
161
    code = vnodePreprocessCreateTableReq(pVnode, pCoder, btimeMs, &uid);
H
Hongze Cheng 已提交
162 163 164 165 166 167 168 169
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // submit data
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
170 171 172

  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
    *(int64_t *)(pCoder->data + pCoder->pos) = uid;
H
Hongze Cheng 已提交
173
    pCoder->pos += sizeof(int64_t);
H
Hongze Cheng 已提交
174
  } else {
H
Hongze Cheng 已提交
175 176 177 178
    if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
      code = TSDB_CODE_INVALID_MSG;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
179
  }
H
Hongze Cheng 已提交
180

H
Hongze Cheng 已提交
181
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
H
Hongze Cheng 已提交
182 183 184 185
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
186
  // scan and check
187
  TSKEY now = btimeMs;
H
Hongze Cheng 已提交
188 189 190 191 192 193 194 195 196 197
  if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) {
    now *= 1000;
  } else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) {
    now *= 1000000;
  }
  TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
  TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    uint64_t nColData;
    if (tDecodeU64v(pCoder, &nColData) < 0) {
H
Hongze Cheng 已提交
198
      code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
199
      goto _exit;
H
Hongze Cheng 已提交
200 201
    }

H
Hongze Cheng 已提交
202 203
    SColData colData = {0};
    pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
H
Hongze Cheng 已提交
204 205 206 207 208
    if (colData.flag != HAS_VALUE) {
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }

H
Hongze Cheng 已提交
209 210 211 212 213 214
    for (int32_t iRow = 0; iRow < colData.nVal; iRow++) {
      if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) {
        code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
        goto _exit;
      }
    }
215 216 217 218

    for (uint64_t i = 1; i < nColData; i++) {
      pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
    }
H
Hongze Cheng 已提交
219 220 221
  } else {
    uint64_t nRow;
    if (tDecodeU64v(pCoder, &nRow) < 0) {
H
Hongze Cheng 已提交
222
      code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
223
      goto _exit;
H
Hongze Cheng 已提交
224
    }
H
Hongze Cheng 已提交
225

H
Hongze Cheng 已提交
226 227 228
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
      pCoder->pos += pRow->len;
H
Hongze Cheng 已提交
229

H
Hongze Cheng 已提交
230 231 232
      if (pRow->ts < minKey || pRow->ts > maxKey) {
        code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
        goto _exit;
H
Hongze Cheng 已提交
233
      }
H
Hongze Cheng 已提交
234 235
    }
  }
H
Hongze Cheng 已提交
236

S
Shungang Li 已提交
237 238 239 240
  if (!tDecodeIsEnd(pCoder)) {
    *(int64_t *)(pCoder->data + pCoder->pos) = ctimeMs;
    pCoder->pos += sizeof(int64_t);
  }
241

H
Hongze Cheng 已提交
242
  tEndDecode(pCoder);
H
Hongze Cheng 已提交
243

H
Hongze Cheng 已提交
244
_exit:
H
Hongze Cheng 已提交
245
  return code;
H
Hongze Cheng 已提交
246 247 248 249
}
static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
250

H
Hongze Cheng 已提交
251
  SDecoder *pCoder = &(SDecoder){0};
H
Hongze Cheng 已提交
252

H
Hongze Cheng 已提交
253
  if (taosHton64(((SSubmitReq2Msg *)pMsg->pCont)->version) != 1) {
H
Hongze Cheng 已提交
254 255 256 257
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

X
Xiaoyu Wang 已提交
258
  tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SSubmitReq2Msg), pMsg->contLen - sizeof(SSubmitReq2Msg));
H
Hongze Cheng 已提交
259

H
Hongze Cheng 已提交
260 261 262 263
  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
264

H
Hongze Cheng 已提交
265 266 267 268 269
  uint64_t nSubmitTbData;
  if (tDecodeU64v(pCoder, &nSubmitTbData) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
270

271 272
  int64_t btimeMs = taosGetTimestampMs();
  int64_t ctimeMs = btimeMs;
H
Hongze Cheng 已提交
273
  for (int32_t i = 0; i < nSubmitTbData; i++) {
274
    code = vnodePreProcessSubmitTbData(pVnode, pCoder, btimeMs, ctimeMs);
H
Hongze Cheng 已提交
275
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
276
  }
H
Hongze Cheng 已提交
277

H
Hongze Cheng 已提交
278
  tEndDecode(pCoder);
H
Hongze Cheng 已提交
279

H
Hongze Cheng 已提交
280
_exit:
D
dapan1121 已提交
281
  tDecoderClear(pCoder);
282
  if (code) {
D
dapan1121 已提交
283 284
    vError("vgId:%d, %s:%d failed to preprocess submit request since %s, msg type:%s", TD_VID(pVnode), __func__, lino,
           tstrerror(code), TMSG_INFO(pMsg->msgType));
285
  }
H
Hongze Cheng 已提交
286 287
  return code;
}
H
Hongze Cheng 已提交
288

H
Hongze Cheng 已提交
289 290 291
static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;

292 293 294 295 296
  int32_t    size;
  int32_t    ret;
  uint8_t   *pCont;
  SEncoder  *pCoder = &(SEncoder){0};
  SDeleteRes res = {0};
H
Haojun Liao 已提交
297

298
  SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Haojun Liao 已提交
299
  initStorageAPI(&handle.api);
H
Hongze Cheng 已提交
300 301 302 303

  code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
  if (code) goto _exit;

304
  res.ctimeMs = taosGetTimestampMs();
H
Hongze Cheng 已提交
305 306 307
  // malloc and encode
  tEncodeSize(tEncodeDeleteRes, &res, size, ret);
  pCont = rpcMallocCont(size + sizeof(SMsgHead));
H
Hongze Cheng 已提交
308

H
Hongze Cheng 已提交
309 310
  ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
  ((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
H
Hongze Cheng 已提交
311

H
Hongze Cheng 已提交
312 313 314
  tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
  tEncodeDeleteRes(pCoder, &res);
  tEncoderClear(pCoder);
H
Hongze Cheng 已提交
315

H
Hongze Cheng 已提交
316 317 318
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = pCont;
  pMsg->contLen = size + sizeof(SMsgHead);
H
Hongze Cheng 已提交
319

H
Hongze Cheng 已提交
320 321 322 323 324
  taosArrayDestroy(res.uidList);

_exit:
  return code;
}
H
Hongze Cheng 已提交
325

326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
static int32_t vnodePreProcessBatchDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;
  int32_t lino = 0;

  int64_t         ctimeMs = taosGetTimestampMs();
  SBatchDeleteReq pReq = {0};
  SDecoder       *pCoder = &(SDecoder){0};

  tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));

  if (tDecodeSBatchDeleteReqSetCtime(pCoder, &pReq, ctimeMs) < 0) {
    code = TSDB_CODE_INVALID_MSG;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pReq.deleteReqs);

  if (code) {
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  } else {
    vTrace("vgId:%d %s done, ctimeMs generated:%" PRId64, TD_VID(pVnode), __func__, ctimeMs);
  }
  return code;
}

H
Hongze Cheng 已提交
351 352 353 354 355 356 357
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;

  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      code = vnodePreProcessCreateTableMsg(pVnode, pMsg);
    } break;
358 359 360
    case TDMT_VND_ALTER_TABLE: {
      code = vnodePreProcessAlterTableMsg(pVnode, pMsg);
    } break;
H
Hongze Cheng 已提交
361 362 363 364 365
    case TDMT_VND_SUBMIT: {
      code = vnodePreProcessSubmitMsg(pVnode, pMsg);
    } break;
    case TDMT_VND_DELETE: {
      code = vnodePreProcessDeleteMsg(pVnode, pMsg);
H
Hongze Cheng 已提交
366
    } break;
367 368 369
    case TDMT_VND_BATCH_DEL: {
      code = vnodePreProcessBatchDeleteMsg(pVnode, pMsg);
    } break;
H
Hongze Cheng 已提交
370 371 372
    default:
      break;
  }
H
Hongze Cheng 已提交
373

H
Hongze Cheng 已提交
374 375
_exit:
  if (code) {
D
dapan1121 已提交
376 377
    vError("vgId:%d, failed to preprocess write request since %s, msg type:%s", TD_VID(pVnode), tstrerror(code),
           TMSG_INFO(pMsg->msgType));
H
Hongze Cheng 已提交
378
  }
H
Hongze Cheng 已提交
379
  return code;
H
Hongze Cheng 已提交
380 381
}

382
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg *pRsp) {
383 384 385 386
  void   *ptr = NULL;
  void   *pReq;
  int32_t len;
  int32_t ret;
387

388 389
  if (ver <= pVnode->state.applied) {
    vError("vgId:%d, duplicate write request. ver: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), ver,
390
           pVnode->state.applied);
391
    terrno = TSDB_CODE_VND_DUP_REQUEST;
392 393 394
    return -1;
  }

395
  vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver);
H
Hongze Cheng 已提交
396

397
  ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
398
  ASSERT(pVnode->state.applied + 1 == ver);
399

400
  atomic_store_64(&pVnode->state.applied, ver);
401
  atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
H
Hongze Cheng 已提交
402

403 404
  if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;

L
Liu Jicong 已提交
405
  if (pMsg->msgType == TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE || pMsg->msgType == TDMT_STREAM_TASK_CHECK_RSP) {
406
    if (tqCheckLogInWal(pVnode->pTq, ver)) return 0;
L
Liu Jicong 已提交
407 408
  }

H
Hongze Cheng 已提交
409 410 411
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
B
Benguang Zhao 已提交
412
  bool needCommit = false;
H
Hongze Cheng 已提交
413 414

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
415
    /* META */
H
Hongze Cheng 已提交
416
    case TDMT_VND_CREATE_STB:
417
      if (vnodeProcessCreateStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
418
      break;
H
Hongze Cheng 已提交
419
    case TDMT_VND_ALTER_STB:
420
      if (vnodeProcessAlterStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
421
      break;
H
Hongze Cheng 已提交
422
    case TDMT_VND_DROP_STB:
423
      if (vnodeProcessDropStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
424
      break;
H
Hongze Cheng 已提交
425
    case TDMT_VND_CREATE_TABLE:
426
      if (vnodeProcessCreateTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
427 428
      break;
    case TDMT_VND_ALTER_TABLE:
429
      if (vnodeProcessAlterTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
430
      break;
H
Hongze Cheng 已提交
431
    case TDMT_VND_DROP_TABLE:
432
      if (vnodeProcessDropTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
433
      break;
434
    case TDMT_VND_DROP_TTL_TABLE:
435
      if (vnodeProcessDropTtlTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
436
      break;
S
Shengliang Guan 已提交
437
    case TDMT_VND_TRIM:
438
      if (vnodeProcessTrimReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
439 440
      break;
    case TDMT_VND_CREATE_SMA:
441
      if (vnodeProcessCreateTSmaReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
442
      break;
H
Hongze Cheng 已提交
443
    /* TSDB */
H
Hongze Cheng 已提交
444
    case TDMT_VND_SUBMIT:
445
      if (vnodeProcessSubmitReq(pVnode, ver, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
446
      break;
D
dapan1121 已提交
447
    case TDMT_VND_DELETE:
448
      if (vnodeProcessDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
D
dapan1121 已提交
449
      break;
450
    case TDMT_VND_BATCH_DEL:
451
      if (vnodeProcessBatchDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
452
      break;
H
Hongze Cheng 已提交
453
    /* TQ */
L
Liu Jicong 已提交
454
    case TDMT_VND_TMQ_SUBSCRIBE:
455
      if (tqProcessSubscribeReq(pVnode->pTq, ver, pReq, len) < 0) {
456
        goto _err;
L
Liu Jicong 已提交
457 458
      }
      break;
L
Liu Jicong 已提交
459
    case TDMT_VND_TMQ_DELETE_SUB:
460
      if (tqProcessDeleteSubReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
461 462 463
        goto _err;
      }
      break;
L
Liu Jicong 已提交
464
    case TDMT_VND_TMQ_COMMIT_OFFSET:
465
      if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
466
        goto _err;
L
Liu Jicong 已提交
467 468
      }
      break;
469
    case TDMT_VND_TMQ_SEEK_TO_OFFSET:
470
      if (tqProcessSeekReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
471 472 473
        goto _err;
      }
      break;
L
Liu Jicong 已提交
474
    case TDMT_VND_TMQ_ADD_CHECKINFO:
475
      if (tqProcessAddCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
476 477 478
        goto _err;
      }
      break;
L
Liu Jicong 已提交
479
    case TDMT_VND_TMQ_DEL_CHECKINFO:
480
      if (tqProcessDelCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
L
Liu Jicong 已提交
481 482 483
        goto _err;
      }
      break;
484
    case TDMT_STREAM_TASK_DEPLOY: {
485
      if (pVnode->restored && tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) {
486
        goto _err;
H
Hongze Cheng 已提交
487 488
      }
    } break;
L
Liu Jicong 已提交
489
    case TDMT_STREAM_TASK_DROP: {
490
      if (tqProcessTaskDropReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
L
Liu Jicong 已提交
491 492 493
        goto _err;
      }
    } break;
5
54liuyao 已提交
494
    case TDMT_STREAM_TASK_PAUSE: {
495
      if (pVnode->restored && tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
5
54liuyao 已提交
496 497 498 499
        goto _err;
      }
    } break;
    case TDMT_STREAM_TASK_RESUME: {
500
      if (pVnode->restored && tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
5
54liuyao 已提交
501 502 503
        goto _err;
      }
    } break;
L
Liu Jicong 已提交
504
    case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: {
505
      if (tqProcessTaskRecover2Req(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
L
Liu Jicong 已提交
506 507 508
        goto _err;
      }
    } break;
509
    case TDMT_STREAM_TASK_CHECK_RSP: {
510
      if (tqProcessStreamTaskCheckRsp(pVnode->pTq, ver, pReq, len) < 0) {
511 512 513
        goto _err;
      }
    } break;
514
    case TDMT_VND_ALTER_CONFIRM:
515
      needCommit = pVnode->config.hashChange;
516
      if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
517 518
        goto _err;
      }
519
      break;
520
    case TDMT_VND_ALTER_CONFIG:
521
      vnodeProcessAlterConfigReq(pVnode, ver, pReq, len, pRsp);
S
Shengliang Guan 已提交
522
      break;
H
Hongze Cheng 已提交
523
    case TDMT_VND_COMMIT:
B
Benguang Zhao 已提交
524 525
      needCommit = true;
      break;
dengyihao's avatar
dengyihao 已提交
526
    case TDMT_VND_CREATE_INDEX:
527
      vnodeProcessCreateIndexReq(pVnode, ver, pReq, len, pRsp);
dengyihao's avatar
dengyihao 已提交
528 529
      break;
    case TDMT_VND_DROP_INDEX:
530
      vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
dengyihao's avatar
dengyihao 已提交
531
      break;
H
Hongze Cheng 已提交
532
    case TDMT_VND_COMPACT:
533
      vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp);
H
Hongze Cheng 已提交
534
      goto _exit;
H
Hongze Cheng 已提交
535
    default:
L
Liu Jicong 已提交
536 537
      vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
      return -1;
H
Hongze Cheng 已提交
538 539
  }

S
Shengliang Guan 已提交
540
  vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code,
541
         ver);
H
Hongze Cheng 已提交
542

543
  walApplyVer(pVnode->pWal, ver);
544

545
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) {
S
Shengliang Guan 已提交
546
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
547 548 549
    return -1;
  }

H
Hongze Cheng 已提交
550
  // commit if need
B
Benguang Zhao 已提交
551
  if (needCommit) {
552
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), ver);
553 554 555 556
    if (vnodeAsyncCommit(pVnode) < 0) {
      vError("vgId:%d, failed to vnode async commit since %s.", TD_VID(pVnode), tstrerror(terrno));
      goto _err;
    }
H
Hongze Cheng 已提交
557 558

    // start a new one
559
    if (vnodeBegin(pVnode) < 0) {
H
Hongze Cheng 已提交
560 561
      vError("vgId:%d, failed to begin vnode since %s.", TD_VID(pVnode), tstrerror(terrno));
      goto _err;
562
    }
H
Hongze Cheng 已提交
563 564
  }

H
Hongze Cheng 已提交
565
_exit:
H
Hongze Cheng 已提交
566
  return 0;
H
Hongze Cheng 已提交
567 568

_err:
569 570
  vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
         tstrerror(terrno), ver);
H
Hongze Cheng 已提交
571
  return -1;
H
Hongze Cheng 已提交
572 573
}

574
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan1121 已提交
575
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
D
dapan1121 已提交
576 577 578
    return 0;
  }

579
  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg, TDMT_SCH_QUERY == pMsg->msgType);
D
dapan1121 已提交
580 581 582
}

int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
583
  vTrace("message in vnode query queue is processing");
584
  if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && !syncIsReadyForRead(pVnode->sync)) {
585
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
586 587 588
    return 0;
  }

589 590 591 592 593
  if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) {
    vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
    return 0;
  }

594
  SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
595 596
  initStorageAPI(&handle.api);

H
Hongze Cheng 已提交
597
  switch (pMsg->msgType) {
D
dapan1121 已提交
598
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
599
    case TDMT_SCH_MERGE_QUERY:
D
dapan1121 已提交
600
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
601
    case TDMT_SCH_QUERY_CONTINUE:
D
dapan1121 已提交
602
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
603 604
    case TDMT_VND_TMQ_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg);
605 606
    case TDMT_VND_TMQ_CONSUME_PUSH:
      return tqProcessPollPush(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
607 608
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
609
      return TSDB_CODE_APP_ERROR;
H
Hongze Cheng 已提交
610 611 612
  }
}

613
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
S
Shengliang Guan 已提交
614
  vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
L
Liu Jicong 已提交
615
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
616
       pMsg->msgType == TDMT_VND_BATCH_META) &&
617
      !syncIsReadyForRead(pVnode->sync)) {
618
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
619 620 621
    return 0;
  }

H
Hongze Cheng 已提交
622
  switch (pMsg->msgType) {
D
dapan1121 已提交
623
    case TDMT_SCH_FETCH:
D
dapan1121 已提交
624
    case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
625
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
626
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
627
      return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
L
Liu Jicong 已提交
628 629
    // case TDMT_SCH_CANCEL_TASK:
    //   return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
630
    case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
631
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
632
    case TDMT_SCH_QUERY_HEARTBEAT:
D
dapan1121 已提交
633
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
634
    case TDMT_VND_TABLE_META:
D
dapan1121 已提交
635
      return vnodeGetTableMeta(pVnode, pMsg, true);
D
dapan1121 已提交
636
    case TDMT_VND_TABLE_CFG:
D
dapan1121 已提交
637 638 639
      return vnodeGetTableCfg(pVnode, pMsg, true);
    case TDMT_VND_BATCH_META:
      return vnodeGetBatchMeta(pVnode, pMsg);
640 641
//    case TDMT_VND_TMQ_CONSUME:
//      return tqProcessPollReq(pVnode->pTq, pMsg);
642 643
    case TDMT_VND_TMQ_VG_WALINFO:
      return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
644 645 646
    case TDMT_STREAM_TASK_RUN:
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
647
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
648 649
    case TDMT_STREAM_TASK_CHECK:
      return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
650
    case TDMT_STREAM_TASK_DISPATCH_RSP:
L
Liu Jicong 已提交
651
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
652 653
    case TDMT_STREAM_RETRIEVE:
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
654 655
    case TDMT_STREAM_RETRIEVE_RSP:
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
656
    case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE:
L
Liu Jicong 已提交
657 658 659 660 661
      return tqProcessTaskRecover1Req(pVnode->pTq, pMsg);
    case TDMT_STREAM_RECOVER_FINISH:
      return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_RECOVER_FINISH_RSP:
      return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
662 663
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
664
      return TSDB_CODE_APP_ERROR;
H
Hongze Cheng 已提交
665 666 667 668
  }
}

void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
C
Cary Xu 已提交
669
  // blockDebugShowDataBlocks(data, __func__);
670
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
671 672
}

D
dapan1121 已提交
673
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
674 675 676
  if (NULL == pMetaRsp) {
    return;
  }
L
Liu Jicong 已提交
677

D
dapan1121 已提交
678 679 680 681 682 683
  strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
  pMetaRsp->dbId = pVnode->config.dbId;
  pMetaRsp->vgId = TD_VID(pVnode);
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}

H
Hongze Cheng 已提交
684
extern int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now);
685
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
686
  int32_t     code = 0;
S
Shengliang Guan 已提交
687 688
  SVTrimDbReq trimReq = {0};

H
Hongze Cheng 已提交
689 690 691 692
  // decode
  if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
S
Shengliang Guan 已提交
693 694
  }

C
Cary Xu 已提交
695 696
  vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);

H
Hongze Cheng 已提交
697
  // process
H
Hongze Cheng 已提交
698
  vnodeAsyncRentention(pVnode, trimReq.timestamp);
H
Hongze Cheng 已提交
699 700
  tsem_wait(&pVnode->canCommit);
  tsem_post(&pVnode->canCommit);
C
Cary Xu 已提交
701

H
Hongze Cheng 已提交
702 703
_exit:
  return code;
S
Shengliang Guan 已提交
704 705
}

706
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
707 708 709
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
  if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
710 711 712 713 714 715
  SVDropTtlTableReq ttlReq = {0};
  if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto end;
  }

716 717
  vDebug("vgId:%d, drop ttl table req will be processed, time:%" PRId32, pVnode->config.vgId, ttlReq.timestampSec);
  int32_t ret = metaTtlDropTable(pVnode->pMeta, (int64_t)ttlReq.timestampSec * 1000, tbUids);
L
Liu Jicong 已提交
718
  if (ret != 0) {
719 720
    goto end;
  }
L
Liu Jicong 已提交
721
  if (taosArrayGetSize(tbUids) > 0) {
wmmhello's avatar
wmmhello 已提交
722 723
    tqUpdateTbUidList(pVnode->pTq, tbUids, false);
  }
724

725
  vnodeAsyncRentention(pVnode, ttlReq.timestampSec);
H
Hongze Cheng 已提交
726

727 728 729 730 731
end:
  taosArrayDestroy(tbUids);
  return ret;
}

732
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
733
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
734
  SDecoder       coder;
H
Hongze Cheng 已提交
735

H
Hongze Cheng 已提交
736 737 738 739 740 741
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
742
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
743 744

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
745 746
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
747 748
  }

749
  if (metaCreateSTable(pVnode->pMeta, ver, &req) < 0) {
H
Hongze Cheng 已提交
750 751
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
752 753
  }

754
  if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) {
755 756 757
    pRsp->code = terrno;
    goto _err;
  }
C
Cary Xu 已提交
758

H
Hongze Cheng 已提交
759
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
760
  return 0;
H
Hongze Cheng 已提交
761 762

_err:
H
Hongze Cheng 已提交
763
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
764
  return -1;
H
Hongze Cheng 已提交
765 766
}

767
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
768
  SDecoder           decoder = {0};
769
  SEncoder           encoder = {0};
770
  int32_t            rcode = 0;
H
Hongze Cheng 已提交
771 772
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
773 774 775
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
776
  STbUidStore       *pStore = NULL;
777
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
778 779

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
780 781 782
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
783

H
Hongze Cheng 已提交
784
  // decode
H
Hongze Cheng 已提交
785 786
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
787 788 789 790
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
791

H
Hongze Cheng 已提交
792
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
793 794
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
795 796 797 798 799
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
800
  // loop to create table
801
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
802
    pCreateReq = req.pReqs + iReq;
D
dapan1121 已提交
803
    memset(&cRsp, 0, sizeof(cRsp));
H
Hongze Cheng 已提交
804

C
Cary Xu 已提交
805 806 807 808
    if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
      rcode = -1;
      goto _exit;
    }
L
Liu Jicong 已提交
809

wafwerar's avatar
wafwerar 已提交
810 811 812 813 814
    if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
      rcode = -1;
      goto _exit;
    }

H
Hongze Cheng 已提交
815 816 817 818 819 820 821 822 823
    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
824
    if (metaCreateTable(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) {
H
Hongze Cheng 已提交
825 826 827 828 829
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
830
    } else {
H
Hongze Cheng 已提交
831
      cRsp.code = TSDB_CODE_SUCCESS;
832
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
833
      taosArrayPush(tbUids, &pCreateReq->uid);
L
Liu Jicong 已提交
834
      vnodeUpdateMetaRsp(pVnode, cRsp.pMeta);
H
Hongze Cheng 已提交
835
    }
H
Hongze Cheng 已提交
836 837

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
838 839
  }

H
Haojun Liao 已提交
840
  vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
841
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
842
  if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
C
Cary Xu 已提交
843 844
    goto _exit;
  }
845
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
846

H
Hongze Cheng 已提交
847
  // prepare rsp
848
  int32_t ret = 0;
wafwerar's avatar
wafwerar 已提交
849
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
850 851 852 853 854 855
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
856 857
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
H
Hongze Cheng 已提交
858

H
Hongze Cheng 已提交
859
_exit:
wmmhello's avatar
wmmhello 已提交
860 861
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
862
    taosMemoryFree(pCreateReq->comment);
wmmhello's avatar
wmmhello 已提交
863 864
    taosArrayDestroy(pCreateReq->ctb.tagName);
  }
865
  taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
866
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
867 868
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
869
  return rcode;
H
Hongze Cheng 已提交
870 871
}

872
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
873 874 875 876 877 878 879 880 881 882 883 884 885 886 887
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
888
  }
H
Hongze Cheng 已提交
889

890
  if (metaAlterSTable(pVnode->pMeta, ver, &req) < 0) {
H
Hongze Cheng 已提交
891 892 893 894 895 896 897
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
898 899 900
  return 0;
}

901
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
902
  SVDropStbReq req = {0};
903
  int32_t      rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
904
  SDecoder     decoder = {0};
905
  SArray      *tbUidList = NULL;
H
Hongze Cheng 已提交
906 907 908 909 910 911

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
912 913
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
914 915 916 917 918
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
919 920
  tbUidList = taosArrayInit(8, sizeof(int64_t));
  if (tbUidList == NULL) goto _exit;
921
  if (metaDropSTable(pVnode->pMeta, ver, &req, tbUidList) < 0) {
922 923 924 925 926
    rcode = terrno;
    goto _exit;
  }

  if (tqUpdateTbUidList(pVnode->pTq, tbUidList, false) < 0) {
927 928 929
    rcode = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
930

931 932 933 934 935
  if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
936 937
  // return rsp
_exit:
938
  if (tbUidList) taosArrayDestroy(tbUidList);
H
Hongze Cheng 已提交
939
  pRsp->code = rcode;
H
Hongze Cheng 已提交
940
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
941 942 943
  return 0;
}

944
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
D
dapan1121 已提交
945 946 947
  SVAlterTbReq  vAlterTbReq = {0};
  SVAlterTbRsp  vAlterTbRsp = {0};
  SDecoder      dc = {0};
948 949
  int32_t       rcode = 0;
  int32_t       ret;
D
dapan1121 已提交
950 951
  SEncoder      ec = {0};
  STableMetaRsp vMetaRsp = {0};
H
Hongze Cheng 已提交
952 953 954 955 956 957 958 959 960 961

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
962
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
963
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
964 965
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
966 967 968
  }

  // process
969
  if (metaAlterTable(pVnode->pMeta, ver, &vAlterTbReq, &vMetaRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
970
    vAlterTbRsp.code = terrno;
H
Hongze Cheng 已提交
971
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
972 973
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
974 975
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
976

D
dapan1121 已提交
977 978 979 980 981
  if (NULL != vMetaRsp.pSchemas) {
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
    vAlterTbRsp.pMeta = &vMetaRsp;
  }

H
Hongze Cheng 已提交
982 983 984 985 986 987
_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
M
Minglei Jin 已提交
988 989 990
  if (vMetaRsp.pSchemas) {
    taosMemoryFree(vMetaRsp.pSchemas);
  }
H
Hongze Cheng 已提交
991 992 993
  return 0;
}

994
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
995 996
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
997
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
998
  SEncoder         encoder = {0};
999
  int32_t          ret;
1000
  SArray          *tbUids = NULL;
1001
  STbUidStore     *pStore = NULL;
H
Hongze Cheng 已提交
1002

H
Hongze Cheng 已提交
1003
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
1004 1005 1006
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
1007 1008

  // decode req
H
Hongze Cheng 已提交
1009 1010
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
1011 1012 1013 1014 1015
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
1016 1017

  // process req
1018
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
1019
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
1020 1021
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

1022
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
1023 1024
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
1025
    tb_uid_t     tbUid = 0;
H
Hongze Cheng 已提交
1026

H
Hongze Cheng 已提交
1027
    /* code */
1028
    ret = metaDropTable(pVnode->pMeta, ver, pDropTbReq, tbUids, &tbUid);
H
Hongze Cheng 已提交
1029
    if (ret < 0) {
1030
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
H
Hongze Cheng 已提交
1031 1032 1033 1034
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
1035
    } else {
H
Hongze Cheng 已提交
1036
      dropTbRsp.code = TSDB_CODE_SUCCESS;
1037
      if (tbUid > 0) tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid);
H
Hongze Cheng 已提交
1038 1039 1040 1041 1042
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

1043
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);
1044
  tdUpdateTbUidList(pVnode->pSma, pStore, false);
1045

H
Hongze Cheng 已提交
1046
_exit:
1047
  taosArrayDestroy(tbUids);
1048
  tdUidStoreFree(pStore);
H
Hongze Cheng 已提交
1049
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
1050 1051 1052 1053 1054
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
1055
  taosArrayDestroy(rsp.pArray);
H
Hongze Cheng 已提交
1056 1057 1058
  return 0;
}

1059 1060
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
                                              const char *tags) {
D
dapan 已提交
1061 1062 1063 1064
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;
C
Cary Xu 已提交
1065
  int32_t        rv = -1;
D
dapan 已提交
1066 1067 1068

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
1069 1070 1071 1072 1073

  pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row), 1);  // TODO: use the real schema
  if (pSchema) {
    suid = msgIter->suid;
    rv = TD_ROW_SVER(blkIter.row);
D
dapan 已提交
1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102
typedef struct SSubmitReqConvertCxt {
  SSubmitMsgIter msgIter;
  SSubmitBlk    *pBlock;
  SSubmitBlkIter blkIter;
  STSRow        *pRow;
  STSRowIter     rowIter;
  SSubmitTbData *pTbData;
  STSchema      *pTbSchema;
  SArray        *pColValues;
} SSubmitReqConvertCxt;

static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) {
  taosMemoryFreeClear(pCxt->pTbSchema);
1103 1104
  pCxt->pTbSchema = metaGetTbTSchema(pMeta, pCxt->msgIter.suid > 0 ? pCxt->msgIter.suid : pCxt->msgIter.uid,
                                     pCxt->msgIter.sversion, 1);
1105 1106 1107 1108 1109
  if (NULL == pCxt->pTbSchema) {
    return TSDB_CODE_INVALID_MSG;
  }
  tdSTSRowIterInit(&pCxt->rowIter, pCxt->pTbSchema);

1110
  tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141
  if (NULL == pCxt->pTbData) {
    pCxt->pTbData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
    if (NULL == pCxt->pTbData) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  pCxt->pTbData->flags = 0;
  pCxt->pTbData->suid = pCxt->msgIter.suid;
  pCxt->pTbData->uid = pCxt->msgIter.uid;
  pCxt->pTbData->sver = pCxt->msgIter.sversion;
  pCxt->pTbData->pCreateTbReq = NULL;
  pCxt->pTbData->aRowP = taosArrayInit(128, POINTER_BYTES);
  if (NULL == pCxt->pTbData->aRowP) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  taosArrayDestroy(pCxt->pColValues);
  pCxt->pColValues = taosArrayInit(pCxt->pTbSchema->numOfCols, sizeof(SColVal));
  if (NULL == pCxt->pColValues) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for (int32_t i = 0; i < pCxt->pTbSchema->numOfCols; ++i) {
    SColVal val = COL_VAL_NONE(pCxt->pTbSchema->columns[i].colId, pCxt->pTbSchema->columns[i].type);
    taosArrayPush(pCxt->pColValues, &val);
  }

  return TSDB_CODE_SUCCESS;
}

static void vnodeDestroySubmitReqConvertCxt(SSubmitReqConvertCxt *pCxt) {
  taosMemoryFreeClear(pCxt->pTbSchema);
1142
  tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159
  taosMemoryFreeClear(pCxt->pTbData);
  taosArrayDestroy(pCxt->pColValues);
}

static int32_t vnodeCellValConvertToColVal(STColumn *pCol, SCellVal *pCellVal, SColVal *pColVal) {
  if (tdValTypeIsNone(pCellVal->valType)) {
    pColVal->flag = CV_FLAG_NONE;
    return TSDB_CODE_SUCCESS;
  }

  if (tdValTypeIsNull(pCellVal->valType)) {
    pColVal->flag = CV_FLAG_NULL;
    return TSDB_CODE_SUCCESS;
  }

  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pColVal->value.nData = varDataLen(pCellVal->val);
1160
    pColVal->value.pData = (uint8_t *)varDataVal(pCellVal->val);
1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198
  } else if (TSDB_DATA_TYPE_FLOAT == pCol->type) {
    float f = GET_FLOAT_VAL(pCellVal->val);
    memcpy(&pColVal->value.val, &f, sizeof(f));
  } else if (TSDB_DATA_TYPE_DOUBLE == pCol->type) {
    pColVal->value.val = *(int64_t *)pCellVal->val;
  } else {
    GET_TYPED_DATA(pColVal->value.val, int64_t, pCol->type, pCellVal->val);
  }

  pColVal->flag = CV_FLAG_VALUE;
  return TSDB_CODE_SUCCESS;
}

static int32_t vnodeTSRowConvertToColValArray(SSubmitReqConvertCxt *pCxt) {
  int32_t code = TSDB_CODE_SUCCESS;
  tdSTSRowIterReset(&pCxt->rowIter, pCxt->pRow);
  for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < pCxt->pTbSchema->numOfCols; ++i) {
    STColumn *pCol = pCxt->pTbSchema->columns + i;
    SCellVal  cellVal = {0};
    if (!tdSTSRowIterFetch(&pCxt->rowIter, pCol->colId, pCol->type, &cellVal)) {
      break;
    }
    code = vnodeCellValConvertToColVal(pCol, &cellVal, (SColVal *)taosArrayGet(pCxt->pColValues, i));
  }
  return code;
}

static int32_t vnodeDecodeCreateTbReq(SSubmitReqConvertCxt *pCxt) {
  if (pCxt->msgIter.schemaLen <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  pCxt->pTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
  if (NULL == pCxt->pTbData->pCreateTbReq) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SDecoder decoder = {0};
1199
  tDecoderInit(&decoder, (uint8_t *)pCxt->pBlock->data, pCxt->msgIter.schemaLen);
1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251
  int32_t code = tDecodeSVCreateTbReq(&decoder, pCxt->pTbData->pCreateTbReq);
  tDecoderClear(&decoder);

  return code;
}

static int32_t vnodeSubmitReqConvertToSubmitReq2(SVnode *pVnode, SSubmitReq *pReq, SSubmitReq2 *pReq2) {
  pReq2->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
  if (NULL == pReq2->aSubmitTbData) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSubmitReqConvertCxt cxt = {0};

  int32_t code = tInitSubmitMsgIter(pReq, &cxt.msgIter);
  while (TSDB_CODE_SUCCESS == code) {
    code = tGetSubmitMsgNext(&cxt.msgIter, &cxt.pBlock);
    if (TSDB_CODE_SUCCESS == code) {
      if (NULL == cxt.pBlock) {
        break;
      }
      code = vnodeResetTableCxt(pVnode->pMeta, &cxt);
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = tInitSubmitBlkIter(&cxt.msgIter, cxt.pBlock, &cxt.blkIter);
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = vnodeDecodeCreateTbReq(&cxt);
    }
    while (TSDB_CODE_SUCCESS == code && (cxt.pRow = tGetSubmitBlkNext(&cxt.blkIter)) != NULL) {
      code = vnodeTSRowConvertToColValArray(&cxt);
      if (TSDB_CODE_SUCCESS == code) {
        SRow **pNewRow = taosArrayReserve(cxt.pTbData->aRowP, 1);
        code = tRowBuild(cxt.pColValues, cxt.pTbSchema, pNewRow);
      }
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = (NULL == taosArrayPush(pReq2->aSubmitTbData, cxt.pTbData) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);
    }
    if (TSDB_CODE_SUCCESS == code) {
      taosMemoryFreeClear(cxt.pTbData);
    }
  }

  vnodeDestroySubmitReqConvertCxt(&cxt);
  return code;
}

static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
  int32_t  code = TSDB_CODE_SUCCESS;
  char    *pMsg = NULL;
  uint32_t msglen = 0;
1252
  tEncodeSize(tEncodeSubmitReq, pSubmitReq, msglen, code);
1253 1254 1255 1256 1257 1258 1259 1260
  if (TSDB_CODE_SUCCESS == code) {
    pMsg = taosMemoryMalloc(msglen);
    if (NULL == pMsg) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    SEncoder encoder;
1261
    tEncoderInit(&encoder, (uint8_t *)pMsg, msglen);
1262
    code = tEncodeSubmitReq(&encoder, pSubmitReq);
1263 1264 1265 1266 1267 1268 1269 1270
    tEncoderClear(&encoder);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *ppMsg = pMsg;
  }
  return code;
}

1271
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
1272
  int32_t code = 0;
D
dapan1121 已提交
1273
  terrno = 0;
C
Cary Xu 已提交
1274

H
Hongze Cheng 已提交
1275
  SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
H
Hongze Cheng 已提交
1276
  SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0};
H
Hongze Cheng 已提交
1277
  SArray      *newTbUids = NULL;
H
Hongze Cheng 已提交
1278 1279 1280 1281
  int32_t      ret;
  SEncoder     ec = {0};

  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
1282

1283
  void           *pAllocMsg = NULL;
1284
  SSubmitReq2Msg *pMsg = (SSubmitReq2Msg *)pReq;
1285
  if (0 == pMsg->version) {
1286 1287 1288 1289
    code = vnodeSubmitReqConvertToSubmitReq2(pVnode, (SSubmitReq *)pMsg, pSubmitReq);
    if (TSDB_CODE_SUCCESS == code) {
      code = vnodeRebuildSubmitReqMsg(pSubmitReq, &pReq);
    }
1290 1291 1292
    if (TSDB_CODE_SUCCESS == code) {
      pAllocMsg = pReq;
    }
1293 1294 1295 1296 1297 1298 1299 1300 1301
    if (TSDB_CODE_SUCCESS != code) {
      goto _exit;
    }
  } else {
    // decode
    pReq = POINTER_SHIFT(pReq, sizeof(SSubmitReq2Msg));
    len -= sizeof(SSubmitReq2Msg);
    SDecoder dc = {0};
    tDecoderInit(&dc, pReq, len);
1302
    if (tDecodeSubmitReq(&dc, pSubmitReq) < 0) {
1303 1304 1305 1306
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }
    tDecoderClear(&dc);
D
dapan 已提交
1307
  }
C
Cary Xu 已提交
1308

H
Hongze Cheng 已提交
1309 1310 1311 1312 1313 1314 1315
  // scan
  TSKEY now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
  TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
  TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

H
Hongze Cheng 已提交
1316 1317 1318 1319 1320
    if (pSubmitTbData->pCreateTbReq && pSubmitTbData->pCreateTbReq->uid == 0) {
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }

H
Hongze Cheng 已提交
1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332
    if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
      if (TARRAY_SIZE(pSubmitTbData->aCol) <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      SColData *pColData = (SColData *)taosArrayGet(pSubmitTbData->aCol, 0);
      TSKEY    *aKey = (TSKEY *)(pColData->pData);

      for (int32_t iRow = 0; iRow < pColData->nVal; iRow++) {
        if (aKey[iRow] < minKey || aKey[iRow] > maxKey || (iRow > 0 && aKey[iRow] <= aKey[iRow - 1])) {
          code = TSDB_CODE_INVALID_MSG;
1333
          vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
H
Hongze Cheng 已提交
1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344
          goto _exit;
        }
      }

    } else {
      int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
      SRow  **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);

      for (int32_t iRow = 0; iRow < nRow; ++iRow) {
        if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) {
          code = TSDB_CODE_INVALID_MSG;
1345
          vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
H
Hongze Cheng 已提交
1346 1347 1348 1349 1350 1351
          goto _exit;
        }
      }
    }
  }

H
Hongze Cheng 已提交
1352
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
H
Hongze Cheng 已提交
1353 1354 1355 1356 1357 1358 1359 1360 1361 1362
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

    if (pSubmitTbData->pCreateTbReq) {
      pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid;
    } else {
      SMetaInfo info = {0};

      code = metaGetInfo(pVnode->pMeta, pSubmitTbData->uid, &info, NULL);
      if (code) {
        code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
D
dapan1121 已提交
1363
        vWarn("vgId:%d, table uid:%" PRId64 " not exists", TD_VID(pVnode), pSubmitTbData->uid);
H
Hongze Cheng 已提交
1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380
        goto _exit;
      }

      if (info.suid != pSubmitTbData->suid) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      if (info.suid) {
        metaGetInfo(pVnode->pMeta, info.suid, &info, NULL);
      }

      if (pSubmitTbData->sver != info.skmVer) {
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
        goto _exit;
      }
    }
H
Hongze Cheng 已提交
1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396

    if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
      int32_t   nColData = TARRAY_SIZE(pSubmitTbData->aCol);
      SColData *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);

      if (nColData <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      if (aColData[0].cid != PRIMARYKEY_TIMESTAMP_COL_ID || aColData[0].type != TSDB_DATA_TYPE_TIMESTAMP ||
          aColData[0].nVal <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

1397 1398
      for (int32_t j = 1; j < nColData; j++) {
        if (aColData[j].nVal != aColData[0].nVal) {
H
Hongze Cheng 已提交
1399 1400 1401 1402 1403
          code = TSDB_CODE_INVALID_MSG;
          goto _exit;
        }
      }
    }
H
Hongze Cheng 已提交
1404 1405
  }

L
Liu Jicong 已提交
1406 1407
  vDebug("vgId:%d, submit block size %d", TD_VID(pVnode), (int32_t)taosArrayGetSize(pSubmitReq->aSubmitTbData));

H
Hongze Cheng 已提交
1408
  // loop to handle
H
Hongze Cheng 已提交
1409
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
H
Hongze Cheng 已提交
1410 1411 1412 1413
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

    // create table
    if (pSubmitTbData->pCreateTbReq) {
H
Hongze Cheng 已提交
1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424
      // check (TODO: move check to create table)
      code = grantCheck(TSDB_GRANT_TIMESERIES);
      if (code) goto _exit;

      code = grantCheck(TSDB_GRANT_TABLE);
      if (code) goto _exit;

      // alloc if need
      if (pSubmitRsp->aCreateTbRsp == NULL &&
          (pSubmitRsp->aCreateTbRsp = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(SVCreateTbRsp))) ==
              NULL) {
H
Hongze Cheng 已提交
1425
        code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1426 1427 1428 1429 1430 1431
        goto _exit;
      }

      SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);

      // create table
1432
      if (metaCreateTable(pVnode->pMeta, ver, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) {
1433
        // create table success
H
Hongze Cheng 已提交
1434 1435 1436

        if (newTbUids == NULL &&
            (newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) {
H
Hongze Cheng 已提交
1437
          code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1438 1439 1440 1441 1442 1443
          goto _exit;
        }

        taosArrayPush(newTbUids, &pSubmitTbData->uid);

        if (pCreateTbRsp->pMeta) {
D
dapan1121 已提交
1444
          vnodeUpdateMetaRsp(pVnode, pCreateTbRsp->pMeta);
H
Hongze Cheng 已提交
1445 1446 1447 1448 1449 1450
        }
      } else {  // create table failed
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
          code = terrno;
          goto _exit;
        }
X
Xiaoyu Wang 已提交
1451
        terrno = 0;
1452
        pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid;  // update uid if table exist for using below
H
Hongze Cheng 已提交
1453
      }
H
Hongze Cheng 已提交
1454 1455 1456
    }

    // insert data
H
Hongze Cheng 已提交
1457
    int32_t affectedRows;
1458
    code = tsdbInsertTableData(pVnode->pTsdb, ver, pSubmitTbData, &affectedRows);
H
Hongze Cheng 已提交
1459 1460
    if (code) goto _exit;

1461 1462 1463
    code = metaUpdateChangeTime(pVnode->pMeta, pSubmitTbData->uid, pSubmitTbData->ctimeMs);
    if (code) goto _exit;

H
Hongze Cheng 已提交
1464
    pSubmitRsp->affectedRows += affectedRows;
H
Hongze Cheng 已提交
1465
  }
H
Hongze Cheng 已提交
1466

1467
  // update the affected table uid list
H
Hongze Cheng 已提交
1468 1469 1470 1471
  if (taosArrayGetSize(newTbUids) > 0) {
    vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
           (int32_t)taosArrayGetSize(newTbUids));
    tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
H
Hongze Cheng 已提交
1472
  }
H
Hongze Cheng 已提交
1473 1474 1475 1476 1477 1478 1479 1480 1481 1482

_exit:
  // message
  pRsp->code = code;
  tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSSubmitRsp2(&ec, pSubmitRsp);
  tEncoderClear(&ec);

H
Hongze Cheng 已提交
1483 1484 1485 1486 1487 1488
  // update statistics
  atomic_add_fetch_64(&pVnode->statis.nInsert, pSubmitRsp->affectedRows);
  atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows);
  atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
  if (code == 0) {
    atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
1489
    tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
H
Hongze Cheng 已提交
1490 1491
  }

H
Hongze Cheng 已提交
1492 1493
  // clear
  taosArrayDestroy(newTbUids);
1494
  tDestroySubmitReq(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE);
H
Hongze Cheng 已提交
1495
  tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
H
Hongze Cheng 已提交
1496

D
dapan1121 已提交
1497 1498
  if (code) terrno = code;

1499
  taosMemoryFree(pAllocMsg);
1500

H
Hongze Cheng 已提交
1501
  return code;
L
Liu Jicong 已提交
1502
}
1503

1504
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1505
  SVCreateTSmaReq req = {0};
C
Cary Xu 已提交
1506
  SDecoder        coder = {0};
1507

C
Cary Xu 已提交
1508 1509 1510 1511 1512 1513
  if (pRsp) {
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
    pRsp->code = TSDB_CODE_SUCCESS;
    pRsp->pCont = NULL;
    pRsp->contLen = 0;
  }
1514 1515 1516 1517 1518

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
C
Cary Xu 已提交
1519 1520
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
    if (pRsp) pRsp->code = terrno;
1521
    goto _err;
1522
  }
C
Cary Xu 已提交
1523

1524
  if (tdProcessTSmaCreate(pVnode->pSma, ver, (const char *)&req) < 0) {
C
Cary Xu 已提交
1525
    if (pRsp) pRsp->code = terrno;
1526
    goto _err;
1527
  }
C
Cary Xu 已提交
1528

1529
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
1530
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
1531
         req.indexName, req.indexUid, ver, req.tableUid);
H
Hongze Cheng 已提交
1532
  return 0;
1533 1534 1535

_err:
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
1536
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
1537
         TD_VID(pVnode), req.indexName, req.indexUid, ver, req.tableUid, terrstr());
1538
  return -1;
L
Liu Jicong 已提交
1539
}
C
Cary Xu 已提交
1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551

/**
 * @brief specific for smaDstVnode
 *
 * @param pVnode
 * @param pCont
 * @param contLen
 * @return int32_t
 */
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
1552

1553
static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) {
B
Benguang Zhao 已提交
1554 1555 1556
  int32_t code = TSDB_CODE_SUCCESS;

  vInfo("vgId:%d, trim meta of tables per hash range [%" PRIu32 ", %" PRIu32 "]. apply-index:%" PRId64, TD_VID(pVnode),
1557
        pVnode->config.hashBegin, pVnode->config.hashEnd, ver);
B
Benguang Zhao 已提交
1558 1559

  // TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
M
Minglei Jin 已提交
1560
  code = metaTrimTables(pVnode->pMeta);
B
Benguang Zhao 已提交
1561 1562 1563 1564

  return code;
}

1565
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
B
Benguang Zhao 已提交
1566 1567 1568 1569 1570 1571
  vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confim msg is processed", TD_VID(pVnode));
  int32_t code = TSDB_CODE_SUCCESS;
  if (!pVnode->config.hashChange) {
    goto _exit;
  }

1572
  code = vnodeConsolidateAlterHashRange(pVnode, ver);
1573
  if (code < 0) {
1574
    vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(), ver);
1575 1576
    goto _exit;
  }
B
Benguang Zhao 已提交
1577 1578 1579
  pVnode->config.hashChange = false;

_exit:
1580
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
B
Benguang Zhao 已提交
1581
  pRsp->code = code;
1582 1583 1584
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

1585
  return code;
1586
}
S
Shengliang Guan 已提交
1587

1588
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
1589 1590
  bool walChanged = false;
  bool tsdbChanged = false;
1591

S
Shengliang Guan 已提交
1592 1593
  SAlterVnodeConfigReq req = {0};
  if (tDeserializeSAlterVnodeConfigReq(pReq, len, &req) != 0) {
1594 1595 1596 1597
    terrno = TSDB_CODE_INVALID_MSG;
    return TSDB_CODE_INVALID_MSG;
  }

1598
  vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
1599 1600
        " cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d walRetentionPeriod:%d "
        "walRetentionSize:%d",
1601 1602
        TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
        req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
1603
        req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize);
1604 1605 1606

  if (pVnode->config.cacheLastSize != req.cacheLastSize) {
    pVnode->config.cacheLastSize = req.cacheLastSize;
1607 1608
    tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
  }
1609

1610
  if (pVnode->config.szBuf != req.buffer * 1024LL * 1024LL) {
S
Shengliang Guan 已提交
1611
    vInfo("vgId:%d, vnode buffer is changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pVnode->config.szBuf,
1612
          (uint64_t)(req.buffer * 1024LL * 1024LL));
1613
    pVnode->config.szBuf = req.buffer * 1024LL * 1024LL;
H
Hongze Cheng 已提交
1614 1615
  }

1616 1617
  if (pVnode->config.szCache != req.pages) {
    if (metaAlterCache(pVnode->pMeta, req.pages) < 0) {
S
Shengliang Guan 已提交
1618
      vError("vgId:%d, failed to change vnode pages from %d to %d failed since %s", TD_VID(pVnode),
1619
             pVnode->config.szCache, req.pages, tstrerror(errno));
H
Hongze Cheng 已提交
1620 1621
      return errno;
    } else {
S
Shengliang Guan 已提交
1622
      vInfo("vgId:%d, vnode pages is changed from %d to %d", TD_VID(pVnode), pVnode->config.szCache, req.pages);
1623
      pVnode->config.szCache = req.pages;
H
Hongze Cheng 已提交
1624
    }
H
Hongze Cheng 已提交
1625 1626
  }

1627 1628
  if (pVnode->config.cacheLast != req.cacheLast) {
    pVnode->config.cacheLast = req.cacheLast;
1629 1630
  }

1631 1632
  if (pVnode->config.walCfg.fsyncPeriod != req.walFsyncPeriod) {
    pVnode->config.walCfg.fsyncPeriod = req.walFsyncPeriod;
1633 1634 1635
    walChanged = true;
  }

1636 1637
  if (pVnode->config.walCfg.level != req.walLevel) {
    pVnode->config.walCfg.level = req.walLevel;
1638 1639 1640 1641 1642 1643 1644
    walChanged = true;
  }

  if (pVnode->config.walCfg.retentionPeriod != req.walRetentionPeriod) {
    pVnode->config.walCfg.retentionPeriod = req.walRetentionPeriod;
    walChanged = true;
  }
1645

1646 1647
  if (pVnode->config.walCfg.retentionSize != req.walRetentionSize) {
    pVnode->config.walCfg.retentionSize = req.walRetentionSize;
1648 1649 1650
    walChanged = true;
  }

1651 1652
  if (pVnode->config.tsdbCfg.keep0 != req.daysToKeep0) {
    pVnode->config.tsdbCfg.keep0 = req.daysToKeep0;
1653
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1654
      tsdbChanged = true;
1655 1656 1657
    }
  }

1658 1659
  if (pVnode->config.tsdbCfg.keep1 != req.daysToKeep1) {
    pVnode->config.tsdbCfg.keep1 = req.daysToKeep1;
1660
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1661
      tsdbChanged = true;
1662 1663 1664
    }
  }

1665 1666
  if (pVnode->config.tsdbCfg.keep2 != req.daysToKeep2) {
    pVnode->config.tsdbCfg.keep2 = req.daysToKeep2;
1667
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1668
      tsdbChanged = true;
1669 1670 1671
    }
  }

1672 1673 1674 1675 1676 1677 1678 1679
  if (req.sttTrigger != -1 && req.sttTrigger != pVnode->config.sttTrigger) {
    pVnode->config.sttTrigger = req.sttTrigger;
  }

  if (req.minRows != -1 && req.minRows != pVnode->config.tsdbCfg.minRows) {
    pVnode->config.tsdbCfg.minRows = req.minRows;
  }

1680
  if (walChanged) {
M
Minglei Jin 已提交
1681 1682 1683 1684 1685
    walAlter(pVnode->pWal, &pVnode->config.walCfg);
  }

  if (tsdbChanged) {
    tsdbSetKeepCfg(pVnode->pTsdb, &pVnode->config.tsdbCfg);
1686 1687
  }

1688 1689 1690
  return 0;
}

1691
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1692 1693 1694 1695 1696
  SBatchDeleteReq deleteReq;
  SDecoder        decoder;
  tDecoderInit(&decoder, pReq, len);
  tDecodeSBatchDeleteReq(&decoder, &deleteReq);

1697
  SMetaReader mr = {0};
1698
  metaReaderInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
1699

1700 1701 1702
  int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
  for (int32_t i = 0; i < sz; i++) {
    SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i);
1703 1704
    char             *name = pOneReq->tbname;
    if (metaGetTableEntryByName(&mr, name) < 0) {
S
Shengliang Guan 已提交
1705
      vDebug("vgId:%d, stream delete msg, skip since no table: %s", pVnode->config.vgId, name);
1706 1707 1708 1709 1710
      continue;
    }

    int64_t uid = mr.me.uid;

1711
    int32_t code = tsdbDeleteTableData(pVnode->pTsdb, ver, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
1712 1713 1714
    if (code < 0) {
      terrno = code;
      vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
L
Liu Jicong 已提交
1715
             TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
1716
    }
1717

1718 1719 1720 1721 1722 1723 1724 1725
    code = metaUpdateChangeTime(pVnode->pMeta, uid, deleteReq.ctimeMs);
    if (code < 0) {
      terrno = code;
      vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64
             ", end ts:%" PRId64,
             TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
    }

1726
    tDecoderClear(&mr.coder);
1727
  }
1728
  metaReaderClear(&mr);
1729
  taosArrayDestroy(deleteReq.deleteReqs);
1730 1731 1732
  return 0;
}

1733
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
1734 1735 1736 1737
  int32_t     code = 0;
  SDecoder   *pCoder = &(SDecoder){0};
  SDeleteRes *pRes = &(SDeleteRes){0};

wmmhello's avatar
wmmhello 已提交
1738 1739 1740 1741 1742
  pRsp->msgType = TDMT_VND_DELETE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

H
Hongze Cheng 已提交
1743 1744 1745 1746 1747 1748 1749 1750
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
1751
  ASSERT(taosArrayGetSize(pRes->uidList) == 0 || (pRes->skey != 0 && pRes->ekey != 0));
H
Hongze Cheng 已提交
1752 1753

  for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
1754 1755 1756 1757
    uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid);
    code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey);
    if (code) goto _err;
    code = metaUpdateChangeTime(pVnode->pMeta, uid, pRes->ctimeMs);
H
Hongze Cheng 已提交
1758 1759 1760 1761 1762
    if (code) goto _err;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pRes->uidList);
wmmhello's avatar
wmmhello 已提交
1763 1764

  SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows};
L
Liu Jicong 已提交
1765
  int32_t     ret = 0;
wmmhello's avatar
wmmhello 已提交
1766 1767
  tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
L
Liu Jicong 已提交
1768
  SEncoder ec = {0};
wmmhello's avatar
wmmhello 已提交
1769 1770 1771
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVDeleteRsp(&ec, &rsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
1772 1773 1774 1775
  return code;

_err:
  return code;
M
Minglei Jin 已提交
1776
}
1777
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
dengyihao's avatar
dengyihao 已提交
1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_CREATE_INDEX_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);
  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
  }
1793
  if (metaAddIndexToSTable(pVnode->pMeta, ver, &req) < 0) {
dengyihao's avatar
dengyihao 已提交
1794 1795 1796 1797 1798 1799 1800 1801
    pRsp->code = terrno;
    goto _err;
  }
  tDecoderClear(&dc);
  return 0;
_err:
  tDecoderClear(&dc);
  return -1;
dengyihao's avatar
dengyihao 已提交
1802
}
1803
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
dengyihao's avatar
dengyihao 已提交
1804
  SDropIndexReq req = {0};
dengyihao's avatar
dengyihao 已提交
1805
  pRsp->msgType = TDMT_VND_DROP_INDEX_RSP;
dengyihao's avatar
dengyihao 已提交
1806 1807 1808 1809 1810 1811 1812 1813
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  if (tDeserializeSDropIdxReq(pReq, len, &req)) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1814

1815
  if (metaDropIndexFromSTable(pVnode->pMeta, ver, &req) < 0) {
dengyihao's avatar
dengyihao 已提交
1816 1817 1818
    pRsp->code = terrno;
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1819 1820
  return TSDB_CODE_SUCCESS;
}
1821

1822
extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
1823

1824 1825
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
  return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp);
H
Hongze Cheng 已提交
1826
}
1827

H
Hongze Cheng 已提交
1828
#ifndef TD_ENTERPRISE
1829
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1830
  return 0;
dengyihao's avatar
dengyihao 已提交
1831
}
H
Hongze Cheng 已提交
1832
#endif