vnodeSvr.c 22.4 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
19
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
20
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
21
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
22
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
23
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
24
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
25
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
26

H
Hongze Cheng 已提交
27
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
H
Hongze Cheng 已提交
28
#if 0
H
Hongze Cheng 已提交
29 30 31
  SNodeMsg *pMsg;
  SRpcMsg  *pRpc;

H
Hongze Cheng 已提交
32
  *version = pVnode->state.processed;
H
Hongze Cheng 已提交
33 34 35 36 37
  for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
    pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i);
    pRpc = &pMsg->rpcMsg;

    // set request version
H
Hongze Cheng 已提交
38
    if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
H
refact  
Hongze Cheng 已提交
39
      vError("vnode:%d  write wal error since %s", TD_VID(pVnode), terrstr());
H
Hongze Cheng 已提交
40
      return -1;
H
Hongze Cheng 已提交
41 42 43 44
    }
  }

  walFsync(pVnode->pWal, false);
H
Hongze Cheng 已提交
45

H
Hongze Cheng 已提交
46
#endif
H
Hongze Cheng 已提交
47
  return 0;
H
Hongze Cheng 已提交
48 49
}

H
Hongze Cheng 已提交
50
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
51
  void *ptr = NULL;
H
Hongze Cheng 已提交
52 53
  void *pReq;
  int   len;
H
Hongze Cheng 已提交
54
  int   ret;
H
Hongze Cheng 已提交
55

H
Hongze Cheng 已提交
56
  vTrace("vgId:%d start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
57
         version);
H
Hongze Cheng 已提交
58

H
Hongze Cheng 已提交
59 60
  pVnode->state.applied = version;

H
Hongze Cheng 已提交
61 62 63
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
H
Hongze Cheng 已提交
64

H
Hongze Cheng 已提交
65
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
H
Hongze Cheng 已提交
66
    vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
67
    return -1;
H
Hongze Cheng 已提交
68 69 70
  }

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
71
    /* META */
H
Hongze Cheng 已提交
72
    case TDMT_VND_CREATE_STB:
H
Hongze Cheng 已提交
73
      if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
74
      break;
H
Hongze Cheng 已提交
75
    case TDMT_VND_ALTER_STB:
H
Hongze Cheng 已提交
76
      if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
77
      break;
H
Hongze Cheng 已提交
78
    case TDMT_VND_DROP_STB:
H
Hongze Cheng 已提交
79
      if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
80
      break;
H
Hongze Cheng 已提交
81
    case TDMT_VND_CREATE_TABLE:
H
Hongze Cheng 已提交
82
      if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
83 84
      break;
    case TDMT_VND_ALTER_TABLE:
H
Hongze Cheng 已提交
85
      if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
86
      break;
H
Hongze Cheng 已提交
87
    case TDMT_VND_DROP_TABLE:
H
Hongze Cheng 已提交
88
      if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
89
      break;
90 91
    case TDMT_VND_CREATE_SMA: {
      if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
92 93
    } break;
    /* TSDB */
H
Hongze Cheng 已提交
94
    case TDMT_VND_SUBMIT:
H
Hongze Cheng 已提交
95
      if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
96
      break;
H
Hongze Cheng 已提交
97
    /* TQ */
L
Liu Jicong 已提交
98 99 100 101 102 103
    case TDMT_VND_MQ_VG_CHANGE:
      if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                               pMsg->contLen - sizeof(SMsgHead)) < 0) {
        // TODO: handle error
      }
      break;
L
Liu Jicong 已提交
104 105 106 107 108
    case TDMT_VND_MQ_VG_DELETE:
      if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
        // TODO: handle error
      }
      break;
H
Hongze Cheng 已提交
109 110 111 112 113 114 115 116 117 118
    case TDMT_VND_TASK_DEPLOY: {
      if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                              pMsg->contLen - sizeof(SMsgHead)) < 0) {
      }
    } break;
    case TDMT_VND_TASK_WRITE_EXEC: {
      if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
                            0) < 0) {
      }
    } break;
S
Shengliang Guan 已提交
119 120
    case TDMT_VND_ALTER_VNODE:
      break;
H
Hongze Cheng 已提交
121 122 123 124 125
    default:
      ASSERT(0);
      break;
  }

H
Hongze Cheng 已提交
126
  vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
H
Hongze Cheng 已提交
127

H
Hongze Cheng 已提交
128
  // commit if need
H
Hongze Cheng 已提交
129
  if (vnodeShouldCommit(pVnode)) {
H
Hongze Cheng 已提交
130
    vInfo("vgId:%d commit at version %" PRId64, TD_VID(pVnode), version);
H
Hongze Cheng 已提交
131 132 133 134 135
    // commit current change
    vnodeCommit(pVnode);

    // start a new one
    vnodeBegin(pVnode);
H
Hongze Cheng 已提交
136 137 138
  }

  return 0;
H
Hongze Cheng 已提交
139 140

_err:
H
Hongze Cheng 已提交
141
  vDebug("vgId:%d process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
142 143
         tstrerror(terrno), version);
  return -1;
H
Hongze Cheng 已提交
144 145 146
}

int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
147
  vTrace("message in vnode query queue is processing");
148
#if 0
149
  SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
150
#endif
151
  SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Hongze Cheng 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
  switch (pMsg->msgType) {
    case TDMT_VND_QUERY:
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg);
    case TDMT_VND_QUERY_CONTINUE:
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg);
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
  vTrace("message in fetch queue is processing");
  char   *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
  switch (pMsg->msgType) {
    case TDMT_VND_FETCH:
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_FETCH_RSP:
      return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_RES_READY:
      return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_TASKS_STATUS:
      return qWorkerProcessStatusMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_CANCEL_TASK:
      return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_DROP_TASK:
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_TABLE_META:
      return vnodeGetTableMeta(pVnode, pMsg);
    case TDMT_VND_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
    case TDMT_VND_TASK_PIPE_EXEC:
    case TDMT_VND_TASK_MERGE_EXEC:
      return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
    case TDMT_VND_STREAM_TRIGGER:
      return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
    case TDMT_VND_QUERY_HEARTBEAT:
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
  // TODO

  // blockDebugShowData(data);
202
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
203 204 205
}

int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
206
  int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
H
Hongze Cheng 已提交
207

208 209 210
  if (syncEnvIsStart()) {
    SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
    assert(pSyncNode != NULL);
M
Minghao Li 已提交
211

212 213
    ESyncState state = syncGetMyRole(pVnode->sync);
    SyncTerm   currentTerm = syncGetMyTerm(pVnode->sync);
M
Minghao Li 已提交
214

215
    SMsgHead *pHead = pMsg->pCont;
M
Minghao Li 已提交
216

217 218 219 220 221
    char  logBuf[512];
    char *syncNodeStr = sync2SimpleStr(pVnode->sync);
    snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
    syncRpcMsgLog2(logBuf, pMsg);
    taosMemoryFree(syncNodeStr);
M
Minghao Li 已提交
222

223
    SRpcMsg *pRpcMsg = pMsg;
M
Minghao Li 已提交
224

225 226 227
    if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
228

229
      ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
230
      syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
231

232 233 234
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
235

236
      ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
237
      syncPingDestroy(pSyncMsg);
M
Minghao Li 已提交
238

239 240 241
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
242

243
      ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
244
      syncPingReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
245

246 247 248
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
249

250
      ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
251
      syncClientRequestDestroy(pSyncMsg);
M
Minghao Li 已提交
252

253 254 255
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
256

257
      ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
258
      syncRequestVoteDestroy(pSyncMsg);
M
Minghao Li 已提交
259

260 261 262
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
263

264
      ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
265
      syncRequestVoteReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
266

267 268 269
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
270

271
      ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
272
      syncAppendEntriesDestroy(pSyncMsg);
M
Minghao Li 已提交
273

274 275 276 277
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

278
      ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
279
      syncAppendEntriesReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
280

281 282
    } else {
      vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
283
      ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
284
    }
M
Minghao Li 已提交
285

286 287 288
    syncNodeRelease(pSyncNode);
  } else {
    vError("==vnodeProcessSyncReq== error syncEnv stop");
289
    ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
290
  }
291 292

  return ret;
H
Hongze Cheng 已提交
293 294
}

H
Hongze Cheng 已提交
295
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
296
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
297
  SDecoder       coder;
H
Hongze Cheng 已提交
298

H
Hongze Cheng 已提交
299 300 301 302 303 304
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
305
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
306 307

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
308 309
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
310 311
  }

H
Hongze Cheng 已提交
312
  if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
H
Hongze Cheng 已提交
313 314
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
315 316
  }

317
  tdProcessRSmaCreate(pVnode->pSma, pVnode->pMeta, &req, &pVnode->msgCb);
C
Cary Xu 已提交
318

H
Hongze Cheng 已提交
319
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
320
  return 0;
H
Hongze Cheng 已提交
321 322

_err:
H
Hongze Cheng 已提交
323
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
324
  return -1;
H
Hongze Cheng 已提交
325 326
}

H
Hongze Cheng 已提交
327
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
328
  SDecoder           decoder = {0};
H
Hongze Cheng 已提交
329 330 331
  int                rcode = 0;
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
332 333 334
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
335
  STbUidStore       *pStore = NULL;
H
Hongze Cheng 已提交
336 337

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
338 339 340
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
341

H
Hongze Cheng 已提交
342
  // decode
H
Hongze Cheng 已提交
343 344
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
345 346 347 348
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
349

H
Hongze Cheng 已提交
350
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
H
Hongze Cheng 已提交
351 352 353 354 355 356
  if (rsp.pArray == NULL) {
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
357 358 359
  // loop to create table
  for (int iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
H
Hongze Cheng 已提交
360 361 362 363 364 365 366 367 368 369

    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
H
Hongze Cheng 已提交
370
    if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
H
Hongze Cheng 已提交
371 372 373 374 375
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
376
    } else {
H
Hongze Cheng 已提交
377
      cRsp.code = TSDB_CODE_SUCCESS;
378
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
H
Hongze Cheng 已提交
379
    }
H
Hongze Cheng 已提交
380 381

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
382 383
  }

H
Hongze Cheng 已提交
384
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
385

386 387
  tdUpdateTbUidList(pVnode->pSma, pStore);
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
388

H
Hongze Cheng 已提交
389
  // prepare rsp
H
Hongze Cheng 已提交
390 391
  SEncoder encoder = {0};
  int32_t  ret = 0;
wafwerar's avatar
wafwerar 已提交
392
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
393 394 395 396 397 398
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
399 400 401
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
402

H
Hongze Cheng 已提交
403
_exit:
H
Hongze Cheng 已提交
404
  taosArrayDestroy(rsp.pArray);
H
Hongze Cheng 已提交
405 406
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
407
  return rcode;
H
Hongze Cheng 已提交
408 409
}

H
Hongze Cheng 已提交
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
426
  }
H
Hongze Cheng 已提交
427 428 429 430 431 432 433 434 435

  if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
436 437 438
  return 0;
}

H
Hongze Cheng 已提交
439
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
440 441
  SVDropStbReq req = {0};
  int          rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
442
  SDecoder     decoder = {0};
H
Hongze Cheng 已提交
443 444 445 446 447 448

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
449 450
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
451 452 453 454 455
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
H
Hongze Cheng 已提交
456 457 458 459
  // if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
  //   rcode = terrno;
  //   goto _exit;
  // }
H
Hongze Cheng 已提交
460 461 462 463

  // return rsp
_exit:
  pRsp->code = rcode;
H
Hongze Cheng 已提交
464
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
465 466 467
  return 0;
}

H
Hongze Cheng 已提交
468 469
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SVAlterTbReq vAlterTbReq = {0};
H
Hongze Cheng 已提交
470
  SVAlterTbRsp vAlterTbRsp = {0};
H
Hongze Cheng 已提交
471
  SDecoder     dc = {0};
H
Hongze Cheng 已提交
472 473 474
  int          rcode = 0;
  int          ret;
  SEncoder     ec = {0};
H
Hongze Cheng 已提交
475 476 477 478 479 480 481 482 483 484

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
485
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
486
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
487 488
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
489 490 491 492
  }

  // process
  if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
493
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
494
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
495 496
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
497 498
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
499 500 501 502 503 504 505

_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
506 507 508
  return 0;
}

H
Hongze Cheng 已提交
509
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
510 511
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
512
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
513
  SEncoder         encoder = {0};
H
Hongze Cheng 已提交
514 515
  int              ret;

H
Hongze Cheng 已提交
516
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
517 518 519
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
520 521

  // decode req
H
Hongze Cheng 已提交
522 523
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
524 525 526 527 528
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
529 530

  // process req
H
Hongze Cheng 已提交
531
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
H
Hongze Cheng 已提交
532 533 534
  for (int iReq = 0; iReq < req.nReqs; iReq++) {
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
H
Hongze Cheng 已提交
535

H
Hongze Cheng 已提交
536 537 538
    /* code */
    ret = metaDropTable(pVnode->pMeta, version, pDropTbReq);
    if (ret < 0) {
H
Hongze Cheng 已提交
539 540 541 542 543
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
544
    } else {
H
Hongze Cheng 已提交
545
      dropTbRsp.code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
546 547 548 549 550 551
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

_exit:
H
Hongze Cheng 已提交
552
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
553 554 555 556 557
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
558 559 560
  return 0;
}

H
Hongze Cheng 已提交
561
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
D
dapan 已提交
562 563 564 565 566 567 568 569 570 571 572
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
  if (!pSchema || (suid != msgIter->suid)) {
    if (pSchema) {
      taosMemoryFreeClear(pSchema);
    }
H
Hongze Cheng 已提交
573
    pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 1);  // TODO: use the real schema
D
dapan 已提交
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
    if (pSchema) {
      suid = msgIter->suid;
    }
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
593
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
C
Cary Xu 已提交
594 595 596 597 598 599 600 601 602
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SMeta         *pMeta = pVnode->pMeta;
  SSubmitBlk    *pBlock = NULL;

  if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
  while (true) {
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
    if (pBlock == NULL) break;
H
Hongze Cheng 已提交
603

D
dapan 已提交
604 605
    vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
  }
C
Cary Xu 已提交
606 607 608 609

  return 0;
}

H
Hongze Cheng 已提交
610
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
611
  SSubmitReq    *pSubmitReq = (SSubmitReq *)pReq;
H
Hongze Cheng 已提交
612
  SSubmitRsp     submitRsp = {0};
H
Hongze Cheng 已提交
613 614 615 616
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock;
  SSubmitRsp     rsp = {0};
  SVCreateTbReq  createTbReq = {0};
H
Hongze Cheng 已提交
617
  SDecoder       decoder = {0};
H
Hongze Cheng 已提交
618
  int32_t        nRows;
H
Hongze Cheng 已提交
619 620
  int32_t        tsize, ret;
  SEncoder       encoder = {0};
621
  terrno = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
622 623

  pRsp->code = 0;
C
Cary Xu 已提交
624

C
Cary Xu 已提交
625 626 627 628
#ifdef TD_DEBUG_PRINT_ROW
  vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif

H
Hongze Cheng 已提交
629
  // handle the request
H
Hongze Cheng 已提交
630 631 632
  if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
    pRsp->code = TSDB_CODE_INVALID_MSG;
    goto _exit;
H
Hongze Cheng 已提交
633 634
  }

H
Hongze Cheng 已提交
635
  submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp));
636 637 638 639 640
  if (!submitRsp.pArray) {
    pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
641
  for (int i = 0;;) {
H
Hongze Cheng 已提交
642 643 644
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

H
Hongze Cheng 已提交
645 646
    SSubmitBlkRsp submitBlkRsp = {0};

H
Hongze Cheng 已提交
647 648
    // create table for auto create table mode
    if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
649 650
      submitBlkRsp.hashMeta = 1;

H
Hongze Cheng 已提交
651 652
      tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
      if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
H
Hongze Cheng 已提交
653
        pRsp->code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
654
        tDecoderClear(&decoder);
H
Hongze Cheng 已提交
655 656 657 658 659
        goto _exit;
      }

      if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
H
Hongze Cheng 已提交
660
          submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
661
          tDecoderClear(&decoder);
H
Hongze Cheng 已提交
662 663 664 665
          goto _exit;
        }
      }

H
Hongze Cheng 已提交
666
      submitBlkRsp.uid = createTbReq.uid;
D
dapan 已提交
667 668
      submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
      sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name);
H
Hongze Cheng 已提交
669

H
Hongze Cheng 已提交
670 671 672 673 674 675 676
      msgIter.uid = createTbReq.uid;
      if (createTbReq.type == TSDB_CHILD_TABLE) {
        msgIter.suid = createTbReq.ctb.suid;
      } else {
        msgIter.suid = 0;
      }

D
dapan 已提交
677
      vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
H
Hongze Cheng 已提交
678
      tDecoderClear(&decoder);
H
Hongze Cheng 已提交
679 680
    }

H
Hongze Cheng 已提交
681
    if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) {
H
Hongze Cheng 已提交
682
      submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
683 684
    }

H
Hongze Cheng 已提交
685 686 687
    submitRsp.numOfRows += submitBlkRsp.numOfRows;
    submitRsp.affectedRows += submitBlkRsp.affectedRows;
    taosArrayPush(submitRsp.pArray, &submitBlkRsp);
H
Hongze Cheng 已提交
688
  }
689

H
Hongze Cheng 已提交
690
_exit:
H
Hongze Cheng 已提交
691 692 693 694 695 696 697 698
  tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
  pRsp->pCont = rpcMallocCont(tsize);
  pRsp->contLen = tsize;
  tEncoderInit(&encoder, pRsp->pCont, tsize);
  tEncodeSSubmitRsp(&encoder, &submitRsp);
  tEncoderClear(&encoder);

  for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
D
dapan 已提交
699
    taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
H
Hongze Cheng 已提交
700 701 702
  }

  taosArrayDestroy(submitRsp.pArray);
H
Hongze Cheng 已提交
703

704 705 706 707
  // TODO: the partial success scenario and the error case
  // TODO: refactor
  if ((terrno == TSDB_CODE_SUCCESS || terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) &&
      (pRsp->code == TSDB_CODE_SUCCESS)) {
708 709 710 711 712 713 714 715
    tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
  }

  return 0;
}

static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
  SVCreateTSmaReq req = {0};
L
Liu Jicong 已提交
716
  SDecoder        coder;
717 718 719 720 721 722 723 724 725 726 727 728 729

  pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
    pRsp->code = terrno;
    goto _err;
  }
L
Liu Jicong 已提交
730

C
Cary Xu 已提交
731
  if (metaCreateTSma(pVnode->pMeta, version, &req) < 0) {
732 733
    pRsp->code = terrno;
    goto _err;
734
  }
C
Cary Xu 已提交
735

736
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
737
  return 0;
738 739 740 741

_err:
  tDecoderClear(&coder);
  return -1;
L
Liu Jicong 已提交
742
}