vnodeWrite.c 5.8 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tqueue.h"
#include "trpc.h"
S
slguan 已提交
22
#include "tutil.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23 24
#include "tsdb.h"
#include "twal.h"
S
slguan 已提交
25
#include "tdataformat.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26 27
#include "vnode.h"
#include "vnodeInt.h"
S
slguan 已提交
28
#include "vnodeLog.h"
J
jtao1735 已提交
29
#include "tcq.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
30

J
Jeff Tao 已提交
31
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
H
TD-90  
Hongze Cheng 已提交
32 33 34 35 36 37
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
38

39
void vnodeInitWriteFp(void) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
40 41 42 43 44
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT]          = vnodeProcessSubmitMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE]   = vnodeProcessDropTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE]  = vnodeProcessAlterTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE]  = vnodeProcessDropStableMsg;
H
TD-90  
Hongze Cheng 已提交
45
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL]  = vnodeProcessUpdateTagValMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
46 47
}

S
slguan 已提交
48
int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
49
  int32_t    code = 0;
S
slguan 已提交
50 51
  SVnodeObj *pVnode = (SVnodeObj *)param1;
  SWalHead  *pHead = param2;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
52

J
Jeff Tao 已提交
53
  if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) 
54
    return TSDB_CODE_VND_MSG_NOT_PROCESSED; 
J
Jeff Tao 已提交
55

56 57 58 59
  if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
    return TSDB_CODE_VND_NO_WRITE_AUTH;
  }

60
  if (pHead->version == 0) { // from client or CQ 
S
slguan 已提交
61
    if (pVnode->status != TAOS_VN_STATUS_READY) 
62
      return TSDB_CODE_VND_INVALID_VGROUP_ID;  // it may be in deleting or closing state
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63

64
    if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER)
65
      return TSDB_CODE_RPC_NOT_READY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
66 67 68 69

    // assign version
    pVnode->version++;
    pHead->version = pVnode->version;
70
  } else { // from wal or forward 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
71 72 73 74 75 76 77 78 79 80
    // for data from WAL or forward, version may be smaller
    if (pHead->version <= pVnode->version) return 0;
  }

  pVnode->version = pHead->version;

  // write into WAL
  code = walWrite(pVnode->wal, pHead);
  if (code < 0) return code;

81
  // forward to peers, even it is WAL/FWD, it shall be called to update version in sync 
82
  int32_t syncCode = 0;
83
  syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
84 85
  if (syncCode < 0) return syncCode;

86
  // write data locally 
87 88
  code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
  if (code < 0) return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
89

90
  return syncCode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
91 92
}

J
Jeff Tao 已提交
93
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
TD-353  
Hongze Cheng 已提交
94
  int32_t code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
95 96 97

  // save insert result into item

98
  vTrace("vgId:%d, submit msg is processed", pVnode->vgId);
99
  
J
Jeff Tao 已提交
100 101 102
  pRet->len = sizeof(SShellSubmitRspMsg);
  pRet->rsp = rpcMallocCont(pRet->len);
  SShellSubmitRspMsg *pRsp = pRet->rsp;
H
TD-353  
Hongze Cheng 已提交
103
  if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno;
104 105
  pRsp->numOfFailedBlocks = 0; //TODO
  //pRet->len += pRsp->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); //TODO
J
Jeff Tao 已提交
106 107
  pRsp->code              = 0;
  pRsp->numOfRows         = htonl(1);
108
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
109 110 111
  return code;
}

J
Jeff Tao 已提交
112
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
Hongze Cheng 已提交
113 114 115 116 117 118 119

  STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
  if (pCfg == NULL) return terrno;
  int32_t code = tsdbCreateTable(pVnode->tsdb, pCfg);

  tsdbClearTableCfg(pCfg);
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
120 121
}

J
Jeff Tao 已提交
122
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
123 124 125
  SMDDropTableMsg *pTable = pCont;
  int32_t code = 0;

126
  vTrace("vgId:%d, table:%s, start to drop", pVnode->vgId, pTable->tableId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
127 128 129 130 131 132 133 134 135 136
  STableId tableId = {
    .uid = htobe64(pTable->uid),
    .tid = htonl(pTable->sid)
  };

  code = tsdbDropTable(pVnode->tsdb, tableId);

  return code;
}

J
Jeff Tao 已提交
137
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
Hongze Cheng 已提交
138
  return TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
139 140
}

J
Jeff Tao 已提交
141
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
142 143 144
  SMDDropSTableMsg *pTable = pCont;
  int32_t code = 0;

145
  vTrace("vgId:%d, stable:%s, start to drop", pVnode->vgId, pTable->tableId);
guanshengliang's avatar
scripts  
guanshengliang 已提交
146 147 148 149 150
  
  STableId stableId = {
    .uid = htobe64(pTable->uid),
    .tid = -1
  };
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151

guanshengliang's avatar
scripts  
guanshengliang 已提交
152 153
  code = tsdbDropTable(pVnode->tsdb, stableId);
  
guanshengliang's avatar
guanshengliang 已提交
154
  vTrace("vgId:%d, stable:%s, drop stable result:%s", pVnode->vgId, pTable->tableId, tstrerror(code));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
155 156 157 158
 
  return code;
}

H
TD-90  
Hongze Cheng 已提交
159 160 161 162
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
  return tsdbUpdateTagValue(pVnode->tsdb, (SUpdateTableTagValMsg *)pCont);
}

J
Jeff Tao 已提交
163
int vnodeWriteToQueue(void *param, void *data, int type) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
164
  SVnodeObj *pVnode = param;
J
Jeff Tao 已提交
165
  SWalHead *pHead = data;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
166 167 168 169 170

  int size = sizeof(SWalHead) + pHead->len;
  SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
  memcpy(pWal, pHead, size);

J
Jeff Tao 已提交
171 172
  atomic_add_fetch_32(&pVnode->refCount, 1);
  taosWriteQitem(pVnode->wqueue, type, pWal);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173 174 175 176

  return 0;
}