vnodeSvr.c 24.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
19
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
20
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
21
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
22
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
23
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
24
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
25
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
26

H
Hongze Cheng 已提交
27 28
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
  SDecoder dc = {0};
H
Hongze Cheng 已提交
29

H
Hongze Cheng 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      int64_t ctime = taosGetTimestampMs();
      int32_t nReqs;

      tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
      tStartDecode(&dc);

      tDecodeI32v(&dc, &nReqs);
      for (int32_t iReq = 0; iReq < nReqs; iReq++) {
        tb_uid_t uid = tGenIdPI64();
        tStartDecode(&dc);

        tDecodeI32v(&dc, NULL);
        *(int64_t *)(dc.data + dc.pos) = uid;
        *(int64_t *)(dc.data + dc.pos + 8) = ctime;

        tEndDecode(&dc);
      }

      tEndDecode(&dc);
      tDecoderClear(&dc);
    } break;
    case TDMT_VND_SUBMIT: {
      SSubmitMsgIter msgIter = {0};
      SSubmitReq    *pSubmitReq = (SSubmitReq *)pMsg->pCont;
      SSubmitBlk    *pBlock = NULL;
      int64_t        ctime = taosGetTimestampMs();

      tInitSubmitMsgIter(pSubmitReq, &msgIter);

      for (;;) {
        tGetSubmitMsgNext(&msgIter, &pBlock);
        if (pBlock == NULL) break;

        if (msgIter.schemaLen > 0) {
          tDecoderInit(&dc, pBlock->data, msgIter.schemaLen);
          tStartDecode(&dc);

          tDecodeI32v(&dc, NULL);
          *(int64_t *)(dc.data + dc.pos) = tGenIdPI64();
          *(int64_t *)(dc.data + dc.pos + 8) = ctime;

          tEndDecode(&dc);
          tDecoderClear(&dc);
        }
      }

    } break;
    default:
      break;
  }
H
Hongze Cheng 已提交
82 83

  return 0;
H
Hongze Cheng 已提交
84 85
}

H
Hongze Cheng 已提交
86
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
87
  void *ptr = NULL;
H
Hongze Cheng 已提交
88 89
  void *pReq;
  int   len;
H
Hongze Cheng 已提交
90
  int   ret;
H
Hongze Cheng 已提交
91

H
Hongze Cheng 已提交
92
  vTrace("vgId:%d start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
93
         version);
H
Hongze Cheng 已提交
94

H
Hongze Cheng 已提交
95 96
  pVnode->state.applied = version;

H
Hongze Cheng 已提交
97 98 99
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
H
Hongze Cheng 已提交
100 101

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
102
    /* META */
H
Hongze Cheng 已提交
103
    case TDMT_VND_CREATE_STB:
H
Hongze Cheng 已提交
104
      if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
105
      break;
H
Hongze Cheng 已提交
106
    case TDMT_VND_ALTER_STB:
H
Hongze Cheng 已提交
107
      if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
108
      break;
H
Hongze Cheng 已提交
109
    case TDMT_VND_DROP_STB:
H
Hongze Cheng 已提交
110
      if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
111
      break;
H
Hongze Cheng 已提交
112
    case TDMT_VND_CREATE_TABLE:
H
Hongze Cheng 已提交
113
      if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
114 115
      break;
    case TDMT_VND_ALTER_TABLE:
H
Hongze Cheng 已提交
116
      if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
117
      break;
H
Hongze Cheng 已提交
118
    case TDMT_VND_DROP_TABLE:
H
Hongze Cheng 已提交
119
      if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
120
      break;
121 122
    case TDMT_VND_CREATE_SMA: {
      if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
123 124
    } break;
    /* TSDB */
H
Hongze Cheng 已提交
125
    case TDMT_VND_SUBMIT:
H
Hongze Cheng 已提交
126
      if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
127
      break;
H
Hongze Cheng 已提交
128
    /* TQ */
L
Liu Jicong 已提交
129 130 131 132 133 134
    case TDMT_VND_MQ_VG_CHANGE:
      if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                               pMsg->contLen - sizeof(SMsgHead)) < 0) {
        // TODO: handle error
      }
      break;
L
Liu Jicong 已提交
135 136 137 138 139
    case TDMT_VND_MQ_VG_DELETE:
      if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
        // TODO: handle error
      }
      break;
H
Hongze Cheng 已提交
140 141 142 143 144
    case TDMT_VND_TASK_DEPLOY: {
      if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                              pMsg->contLen - sizeof(SMsgHead)) < 0) {
      }
    } break;
L
Liu Jicong 已提交
145
#if 0
H
Hongze Cheng 已提交
146 147 148 149 150
    case TDMT_VND_TASK_WRITE_EXEC: {
      if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
                            0) < 0) {
      }
    } break;
L
Liu Jicong 已提交
151
#endif
S
Shengliang Guan 已提交
152 153
    case TDMT_VND_ALTER_VNODE:
      break;
H
Hongze Cheng 已提交
154 155 156 157 158
    default:
      ASSERT(0);
      break;
  }

H
Hongze Cheng 已提交
159
  vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
H
Hongze Cheng 已提交
160

161 162 163 164 165
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
    vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
    return -1;
  }

H
Hongze Cheng 已提交
166
  // commit if need
H
Hongze Cheng 已提交
167
  if (vnodeShouldCommit(pVnode)) {
H
Hongze Cheng 已提交
168
    vInfo("vgId:%d commit at version %" PRId64, TD_VID(pVnode), version);
H
Hongze Cheng 已提交
169 170 171 172 173
    // commit current change
    vnodeCommit(pVnode);

    // start a new one
    vnodeBegin(pVnode);
H
Hongze Cheng 已提交
174 175 176
  }

  return 0;
H
Hongze Cheng 已提交
177 178

_err:
H
Hongze Cheng 已提交
179
  vDebug("vgId:%d process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
180 181
         tstrerror(terrno), version);
  return -1;
H
Hongze Cheng 已提交
182 183 184
}

int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
185
  vTrace("message in vnode query queue is processing");
186
  SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Hongze Cheng 已提交
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
  switch (pMsg->msgType) {
    case TDMT_VND_QUERY:
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg);
    case TDMT_VND_QUERY_CONTINUE:
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg);
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
  vTrace("message in fetch queue is processing");
  char   *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
  switch (pMsg->msgType) {
    case TDMT_VND_FETCH:
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_FETCH_RSP:
      return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_RES_READY:
      return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_TASKS_STATUS:
      return qWorkerProcessStatusMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_CANCEL_TASK:
      return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_DROP_TASK:
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_TABLE_META:
      return vnodeGetTableMeta(pVnode, pMsg);
    case TDMT_VND_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
L
Liu Jicong 已提交
219

L
Liu Jicong 已提交
220 221 222 223 224
    case TDMT_VND_TASK_RUN: {
      int32_t code = tqProcessTaskRunReq(pVnode->pTq, pMsg);
      pMsg->pCont = NULL;
      return code;
    }
L
Liu Jicong 已提交
225 226 227 228 229 230 231 232 233 234
    case TDMT_VND_TASK_DISPATCH:
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
    case TDMT_VND_TASK_RECOVER:
      return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
    case TDMT_VND_TASK_DISPATCH_RSP:
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
    case TDMT_VND_TASK_RECOVER_RSP:
      return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);

#if 0
H
Hongze Cheng 已提交
235 236 237
    case TDMT_VND_TASK_PIPE_EXEC:
    case TDMT_VND_TASK_MERGE_EXEC:
      return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
L
Liu Jicong 已提交
238
    case TDMT_VND_STREAM_TRIGGER:{
dengyihao's avatar
dengyihao 已提交
239 240 241 242 243
      // refactor, avoid double free
      int code = tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
      pMsg->pCont = NULL;
      return code;
    }
L
Liu Jicong 已提交
244
#endif
H
Hongze Cheng 已提交
245 246 247 248 249 250 251 252 253 254 255 256 257
    case TDMT_VND_QUERY_HEARTBEAT:
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
  // TODO

  // blockDebugShowData(data);
258
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
259 260 261
}

int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
262
  int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
H
Hongze Cheng 已提交
263

264 265 266
  if (syncEnvIsStart()) {
    SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
    assert(pSyncNode != NULL);
M
Minghao Li 已提交
267

268 269
    ESyncState state = syncGetMyRole(pVnode->sync);
    SyncTerm   currentTerm = syncGetMyTerm(pVnode->sync);
M
Minghao Li 已提交
270

271
    SMsgHead *pHead = pMsg->pCont;
M
Minghao Li 已提交
272

273 274 275 276 277
    char  logBuf[512];
    char *syncNodeStr = sync2SimpleStr(pVnode->sync);
    snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
    syncRpcMsgLog2(logBuf, pMsg);
    taosMemoryFree(syncNodeStr);
M
Minghao Li 已提交
278

279
    SRpcMsg *pRpcMsg = pMsg;
M
Minghao Li 已提交
280

281 282 283
    if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
284

285
      ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
286
      syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
287

288 289 290
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
291

292
      ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
293
      syncPingDestroy(pSyncMsg);
M
Minghao Li 已提交
294

295 296 297
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
298

299
      ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
300
      syncPingReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
301

302 303 304
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
305

306
      ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
307
      syncClientRequestDestroy(pSyncMsg);
M
Minghao Li 已提交
308

309 310 311
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
312

313
      ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
314
      syncRequestVoteDestroy(pSyncMsg);
M
Minghao Li 已提交
315

316 317 318
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
319

320
      ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
321
      syncRequestVoteReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
322

323 324 325
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
326

327
      ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
328
      syncAppendEntriesDestroy(pSyncMsg);
M
Minghao Li 已提交
329

330 331 332 333
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

334
      ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
335
      syncAppendEntriesReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
336

337 338
    } else {
      vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
339
      ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
340
    }
M
Minghao Li 已提交
341

342 343 344
    syncNodeRelease(pSyncNode);
  } else {
    vError("==vnodeProcessSyncReq== error syncEnv stop");
345
    ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
346
  }
347 348

  return ret;
H
Hongze Cheng 已提交
349 350
}

H
Hongze Cheng 已提交
351
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
352
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
353
  SDecoder       coder;
H
Hongze Cheng 已提交
354

H
Hongze Cheng 已提交
355 356 357 358 359 360
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
361
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
362 363

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
364 365
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
366 367
  }

H
Hongze Cheng 已提交
368
  if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
H
Hongze Cheng 已提交
369 370
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
371 372
  }

373
  tdProcessRSmaCreate(pVnode->pSma, pVnode->pMeta, &req, &pVnode->msgCb);
C
Cary Xu 已提交
374

H
Hongze Cheng 已提交
375
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
376
  return 0;
H
Hongze Cheng 已提交
377 378

_err:
H
Hongze Cheng 已提交
379
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
380
  return -1;
H
Hongze Cheng 已提交
381 382
}

H
Hongze Cheng 已提交
383
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
384
  SDecoder           decoder = {0};
H
Hongze Cheng 已提交
385 386 387
  int                rcode = 0;
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
388 389 390
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
391
  STbUidStore       *pStore = NULL;
392
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
393 394

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
395 396 397
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
398

H
Hongze Cheng 已提交
399
  // decode
H
Hongze Cheng 已提交
400 401
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
402 403 404 405
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
406

H
Hongze Cheng 已提交
407
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
408 409
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
410 411 412 413 414
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
415 416 417
  // loop to create table
  for (int iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
H
Hongze Cheng 已提交
418 419 420 421 422 423 424 425 426 427

    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
H
Hongze Cheng 已提交
428
    if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
H
Hongze Cheng 已提交
429 430 431 432 433
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
434
    } else {
H
Hongze Cheng 已提交
435
      cRsp.code = TSDB_CODE_SUCCESS;
436
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
437
      taosArrayPush(tbUids, &pCreateReq->uid);
H
Hongze Cheng 已提交
438
    }
H
Hongze Cheng 已提交
439 440

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
441 442
  }

H
Hongze Cheng 已提交
443
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
444

445
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
446 447
  tdUpdateTbUidList(pVnode->pSma, pStore);
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
448

H
Hongze Cheng 已提交
449
  // prepare rsp
H
Hongze Cheng 已提交
450 451
  SEncoder encoder = {0};
  int32_t  ret = 0;
wafwerar's avatar
wafwerar 已提交
452
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
453 454 455 456 457 458
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
459 460 461
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
462

H
Hongze Cheng 已提交
463
_exit:
H
Hongze Cheng 已提交
464
  taosArrayDestroy(rsp.pArray);
465
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
466 467
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
468
  return rcode;
H
Hongze Cheng 已提交
469 470
}

H
Hongze Cheng 已提交
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
487
  }
H
Hongze Cheng 已提交
488 489 490 491 492 493 494 495 496

  if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
497 498 499
  return 0;
}

H
Hongze Cheng 已提交
500
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
501 502
  SVDropStbReq req = {0};
  int          rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
503
  SDecoder     decoder = {0};
H
Hongze Cheng 已提交
504 505 506 507 508 509

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
510 511
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
512 513 514 515 516
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
H
Hongze Cheng 已提交
517 518 519 520
  // if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
  //   rcode = terrno;
  //   goto _exit;
  // }
H
Hongze Cheng 已提交
521 522 523 524

  // return rsp
_exit:
  pRsp->code = rcode;
H
Hongze Cheng 已提交
525
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
526 527 528
  return 0;
}

H
Hongze Cheng 已提交
529 530
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SVAlterTbReq vAlterTbReq = {0};
H
Hongze Cheng 已提交
531
  SVAlterTbRsp vAlterTbRsp = {0};
H
Hongze Cheng 已提交
532
  SDecoder     dc = {0};
H
Hongze Cheng 已提交
533 534 535
  int          rcode = 0;
  int          ret;
  SEncoder     ec = {0};
H
Hongze Cheng 已提交
536 537 538 539 540 541 542 543 544 545

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
546
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
547
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
548 549
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
550 551 552 553
  }

  // process
  if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
554
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
555
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
556 557
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
558 559
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
560 561 562 563 564 565 566

_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
567 568 569
  return 0;
}

H
Hongze Cheng 已提交
570
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
571 572
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
573
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
574
  SEncoder         encoder = {0};
H
Hongze Cheng 已提交
575
  int              ret;
576
  SArray          *tbUids = NULL;
H
Hongze Cheng 已提交
577

H
Hongze Cheng 已提交
578
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
579 580 581
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
582 583

  // decode req
H
Hongze Cheng 已提交
584 585
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
586 587 588 589 590
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
591 592

  // process req
593
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
594
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
595 596
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

H
Hongze Cheng 已提交
597 598 599
  for (int iReq = 0; iReq < req.nReqs; iReq++) {
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
H
Hongze Cheng 已提交
600

H
Hongze Cheng 已提交
601
    /* code */
602
    ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
H
Hongze Cheng 已提交
603
    if (ret < 0) {
H
Hongze Cheng 已提交
604 605 606 607 608
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
609
    } else {
H
Hongze Cheng 已提交
610
      dropTbRsp.code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
611 612 613 614 615
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

616 617
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);

H
Hongze Cheng 已提交
618
_exit:
619
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
620
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
621 622 623 624 625
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
626 627 628
  return 0;
}

H
Hongze Cheng 已提交
629
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
D
dapan 已提交
630 631 632 633 634 635 636 637 638 639 640
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
  if (!pSchema || (suid != msgIter->suid)) {
    if (pSchema) {
      taosMemoryFreeClear(pSchema);
    }
H
Hongze Cheng 已提交
641
    pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 1);  // TODO: use the real schema
D
dapan 已提交
642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660
    if (pSchema) {
      suid = msgIter->suid;
    }
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
661
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
C
Cary Xu 已提交
662 663 664 665 666 667 668 669 670
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SMeta         *pMeta = pVnode->pMeta;
  SSubmitBlk    *pBlock = NULL;

  if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
  while (true) {
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
    if (pBlock == NULL) break;
H
Hongze Cheng 已提交
671

D
dapan 已提交
672 673
    vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
  }
C
Cary Xu 已提交
674 675 676 677

  return 0;
}

H
Hongze Cheng 已提交
678
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
679
  SSubmitReq    *pSubmitReq = (SSubmitReq *)pReq;
H
Hongze Cheng 已提交
680
  SSubmitRsp     submitRsp = {0};
H
Hongze Cheng 已提交
681 682 683 684
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock;
  SSubmitRsp     rsp = {0};
  SVCreateTbReq  createTbReq = {0};
H
Hongze Cheng 已提交
685
  SDecoder       decoder = {0};
H
Hongze Cheng 已提交
686
  int32_t        nRows;
H
Hongze Cheng 已提交
687 688
  int32_t        tsize, ret;
  SEncoder       encoder = {0};
689
  terrno = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
690 691

  pRsp->code = 0;
C
Cary Xu 已提交
692

C
Cary Xu 已提交
693 694 695 696
#ifdef TD_DEBUG_PRINT_ROW
  vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif

C
Cary Xu 已提交
697 698 699 700 701
  if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
    pRsp->code = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
702
  // handle the request
H
Hongze Cheng 已提交
703 704 705
  if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
    pRsp->code = TSDB_CODE_INVALID_MSG;
    goto _exit;
H
Hongze Cheng 已提交
706 707
  }

H
Hongze Cheng 已提交
708
  submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp));
709 710 711 712 713
  if (!submitRsp.pArray) {
    pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
714
  for (;;) {
H
Hongze Cheng 已提交
715 716 717
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

H
Hongze Cheng 已提交
718 719
    SSubmitBlkRsp submitBlkRsp = {0};

H
Hongze Cheng 已提交
720 721
    // create table for auto create table mode
    if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
722 723
      submitBlkRsp.hashMeta = 1;

H
Hongze Cheng 已提交
724 725
      tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
      if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
H
Hongze Cheng 已提交
726
        pRsp->code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
727
        tDecoderClear(&decoder);
H
Hongze Cheng 已提交
728 729 730 731 732
        goto _exit;
      }

      if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
H
Hongze Cheng 已提交
733
          submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
734
          tDecoderClear(&decoder);
H
Hongze Cheng 已提交
735 736 737 738
          goto _exit;
        }
      }

H
Hongze Cheng 已提交
739
      submitBlkRsp.uid = createTbReq.uid;
D
dapan 已提交
740
      submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
D
dapan 已提交
741
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
742

H
Hongze Cheng 已提交
743 744 745 746 747 748 749
      msgIter.uid = createTbReq.uid;
      if (createTbReq.type == TSDB_CHILD_TABLE) {
        msgIter.suid = createTbReq.ctb.suid;
      } else {
        msgIter.suid = 0;
      }

D
dapan 已提交
750
      vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
H
Hongze Cheng 已提交
751
      tDecoderClear(&decoder);
D
dapan1121 已提交
752 753 754
    } else {
      submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
755 756
    }

H
Hongze Cheng 已提交
757
    if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) {
H
Hongze Cheng 已提交
758
      submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
759 760
    }

H
Hongze Cheng 已提交
761 762 763
    submitRsp.numOfRows += submitBlkRsp.numOfRows;
    submitRsp.affectedRows += submitBlkRsp.affectedRows;
    taosArrayPush(submitRsp.pArray, &submitBlkRsp);
H
Hongze Cheng 已提交
764
  }
765

H
Hongze Cheng 已提交
766
_exit:
H
Hongze Cheng 已提交
767 768 769 770 771 772 773 774
  tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
  pRsp->pCont = rpcMallocCont(tsize);
  pRsp->contLen = tsize;
  tEncoderInit(&encoder, pRsp->pCont, tsize);
  tEncodeSSubmitRsp(&encoder, &submitRsp);
  tEncoderClear(&encoder);

  for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
D
dapan 已提交
775
    taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
H
Hongze Cheng 已提交
776 777 778
  }

  taosArrayDestroy(submitRsp.pArray);
H
Hongze Cheng 已提交
779

780 781 782 783
  // TODO: the partial success scenario and the error case
  // TODO: refactor
  if ((terrno == TSDB_CODE_SUCCESS || terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) &&
      (pRsp->code == TSDB_CODE_SUCCESS)) {
784
    tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
785
  }
C
Cary Xu 已提交
786

H
Hongze Cheng 已提交
787
  return 0;
L
Liu Jicong 已提交
788
}
789 790 791

static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
  SVCreateTSmaReq req = {0};
H
Hongze Cheng 已提交
792
  SDecoder        coder;
793 794 795 796 797 798 799 800 801 802 803 804

  pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
    pRsp->code = terrno;
    goto _err;
805
  }
C
Cary Xu 已提交
806

C
Cary Xu 已提交
807 808 809 810
  // record current timezone of server side
  req.timezoneInt = tsTimezone;

  if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
811 812
    pRsp->code = terrno;
    goto _err;
813
  }
C
Cary Xu 已提交
814

815
  tDecoderClear(&coder);
C
Cary Xu 已提交
816 817
  vDebug("vgId:%d success to create tsma %s:%" PRIi64 " for table %" PRIi64, TD_VID(pVnode), req.indexName,
         req.indexUid, req.tableUid);
H
Hongze Cheng 已提交
818
  return 0;
819 820 821

_err:
  tDecoderClear(&coder);
C
Cary Xu 已提交
822 823
  vError("vgId:%d failed to create tsma %s:%" PRIi64 " for table %" PRIi64 " since %s", TD_VID(pVnode), req.indexName,
         req.indexUid, req.tableUid, terrstr(terrno));
824
  return -1;
L
Liu Jicong 已提交
825
}