vnodeWrite.c 7.9 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tqueue.h"
#include "trpc.h"
S
slguan 已提交
22
#include "tutil.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23 24
#include "tsdb.h"
#include "twal.h"
B
Bomin Zhang 已提交
25
#include "tsync.h"
S
slguan 已提交
26
#include "tdataformat.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27 28
#include "vnode.h"
#include "vnodeInt.h"
B
Bomin Zhang 已提交
29
#include "syncInt.h"
J
jtao1735 已提交
30
#include "tcq.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
31

J
Jeff Tao 已提交
32
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
H
TD-90  
Hongze Cheng 已提交
33 34 35 36 37 38
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
39

40
void vnodeInitWriteFp(void) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
41 42 43 44 45
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT]          = vnodeProcessSubmitMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE]   = vnodeProcessDropTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE]  = vnodeProcessAlterTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE]  = vnodeProcessDropStableMsg;
H
TD-90  
Hongze Cheng 已提交
46
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL]  = vnodeProcessUpdateTagValMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47 48
}

S
TD-1918  
Shengliang Guan 已提交
49
int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
50
  int32_t    code = 0;
S
TD-1918  
Shengliang Guan 已提交
51 52
  SVnodeObj *pVnode = param;
  SWalHead * pHead = pWrite->pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
53

S
add log  
Shengliang Guan 已提交
54 55
  if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
    vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]);
S
TD-1652  
Shengliang Guan 已提交
56
    return TSDB_CODE_VND_MSG_NOT_PROCESSED;
S
add log  
Shengliang Guan 已提交
57
  }
J
Jeff Tao 已提交
58

S
TD-1652  
Shengliang Guan 已提交
59
  if (pHead->version == 0) {  // from client or CQ
S
add log  
Shengliang Guan 已提交
60
    if (pVnode->status != TAOS_VN_STATUS_READY) {
S
TD-1652  
Shengliang Guan 已提交
61 62
      vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType],
             pVnode->status);
S
Shengliang Guan 已提交
63
      return TSDB_CODE_APP_NOT_READY;  // it may be in deleting or closing state
S
add log  
Shengliang Guan 已提交
64
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
65

66
    if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
S
TD-1652  
Shengliang Guan 已提交
67 68
      vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[pHead->msgType],
             pVnode->syncCfg.replica, syncRole[pVnode->role]);
S
Shengliang Guan 已提交
69
      return TSDB_CODE_APP_NOT_READY;
S
add log  
Shengliang Guan 已提交
70
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
71 72

    // assign version
S
TD-1652  
Shengliang Guan 已提交
73
    pHead->version = pVnode->version + 1;
S
TD-1652  
Shengliang Guan 已提交
74
    if (pVnode->delay) usleep(pVnode->delay * 1000);
75

S
TD-1652  
Shengliang Guan 已提交
76
  } else {  // from wal or forward
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
77 78 79 80
    // for data from WAL or forward, version may be smaller
    if (pHead->version <= pVnode->version) return 0;
  }

S
TD-1652  
Shengliang Guan 已提交
81 82
  // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
  int32_t syncCode = 0;
S
TD-1918  
Shengliang Guan 已提交
83
  syncCode = syncForwardToPeer(pVnode->sync, pHead, &pWrite->rspRet, qtype);
S
TD-1652  
Shengliang Guan 已提交
84
  if (syncCode < 0) return syncCode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
85 86 87 88 89

  // write into WAL
  code = walWrite(pVnode->wal, pHead);
  if (code < 0) return code;

S
TD-1652  
Shengliang Guan 已提交
90
  pVnode->version = pHead->version;
91

S
TD-1652  
Shengliang Guan 已提交
92
  // write data locally
S
TD-1918  
Shengliang Guan 已提交
93
  code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, &pWrite->rspRet);
94
  if (code < 0) return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
95

96
  return syncCode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
97 98
}

S
TD-1768  
Shengliang Guan 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
int32_t vnodeCheckWrite(void *param) {
  SVnodeObj *pVnode = param;
  if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
    vDebug("vgId:%d, no write auth, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
    return TSDB_CODE_VND_NO_WRITE_AUTH;
  }

  // tsdb may be in reset state
  if (pVnode->tsdb == NULL) {
    vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
    return TSDB_CODE_APP_NOT_READY;
  }

  if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
    vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
           pVnode->refCount, pVnode);
    return TSDB_CODE_APP_NOT_READY;
  }

  return TSDB_CODE_SUCCESS;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
121 122 123 124 125
void vnodeConfirmForward(void *param, uint64_t version, int32_t code) {
  SVnodeObj *pVnode = (SVnodeObj *)param;
  syncConfirmForward(pVnode->sync, version, code);
}

J
Jeff Tao 已提交
126
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
TD-353  
Hongze Cheng 已提交
127
  int32_t code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
128

129 130
  vTrace("vgId:%d, submit msg is processed", pVnode->vgId);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
131
  // save insert result into item
132
  SShellSubmitRspMsg *pRsp = NULL;
S
TD-1652  
Shengliang Guan 已提交
133
  if (pRet) {
134 135 136 137
    pRet->len = sizeof(SShellSubmitRspMsg);
    pRet->rsp = rpcMallocCont(pRet->len);
    pRsp = pRet->rsp;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
138

H
TD-353  
Hongze Cheng 已提交
139
  if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno;
S
TD-1652  
Shengliang Guan 已提交
140

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
141 142 143
  return code;
}

J
Jeff Tao 已提交
144
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
TD-353  
Hongze Cheng 已提交
145
  int code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
146 147

  STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
S
TD-1415  
Shengliang Guan 已提交
148 149 150 151 152 153 154 155 156
  if (pCfg == NULL) {
    ASSERT(terrno != 0);
    return terrno;
  }

  if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) {
    code = terrno;
    ASSERT(code != 0);
  }
H
Hongze Cheng 已提交
157 158 159

  tsdbClearTableCfg(pCfg);
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
160 161
}

J
Jeff Tao 已提交
162
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
163
  SMDDropTableMsg *pTable = pCont;
H
TD-353  
Hongze Cheng 已提交
164
  int32_t          code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
165

166
  vDebug("vgId:%d, table:%s, start to drop", pVnode->vgId, pTable->tableId);
H
Haojun Liao 已提交
167
  STableId tableId = {.uid = htobe64(pTable->uid), .tid = htonl(pTable->tid)};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
168

H
TD-353  
Hongze Cheng 已提交
169
  if (tsdbDropTable(pVnode->tsdb, tableId) < 0) code = terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
170 171 172 173

  return code;
}

J
Jeff Tao 已提交
174
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
175 176 177 178 179 180
  // TODO: disposed in tsdb
  // STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
  // if (pCfg == NULL) return terrno;
  // if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) code = terrno;

  // tsdbClearTableCfg(pCfg);
181
  vDebug("vgId:%d, alter table msg is received", pVnode->vgId);
H
Hongze Cheng 已提交
182
  return TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
183 184
}

J
Jeff Tao 已提交
185
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
S
TD-1732  
Shengliang Guan 已提交
186
  SDropSTableMsg *pTable = pCont;
H
TD-353  
Hongze Cheng 已提交
187
  int32_t           code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
188

189
  vDebug("vgId:%d, stable:%s, start to drop", pVnode->vgId, pTable->tableId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190

H
TD-353  
Hongze Cheng 已提交
191 192 193 194
  STableId stableId = {.uid = htobe64(pTable->uid), .tid = -1};

  if (tsdbDropTable(pVnode->tsdb, stableId) < 0) code = terrno;

195
  vDebug("vgId:%d, stable:%s, drop stable result:%s", pVnode->vgId, pTable->tableId, tstrerror(code));
H
TD-353  
Hongze Cheng 已提交
196

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
197 198 199
  return code;
}

H
TD-90  
Hongze Cheng 已提交
200
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
Hongze Cheng 已提交
201
  if (tsdbUpdateTableTagValue(pVnode->tsdb, (SUpdateTableTagValMsg *)pCont) < 0) {
H
TD-353  
Hongze Cheng 已提交
202 203 204
    return terrno;
  }
  return TSDB_CODE_SUCCESS;
H
TD-90  
Hongze Cheng 已提交
205 206
}

S
TD-1918  
Shengliang Guan 已提交
207 208 209
int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg) {
  SVnodeObj *pVnode = vparam;
  SWalHead * pHead = wparam;
B
Bomin Zhang 已提交
210

S
TD-1918  
Shengliang Guan 已提交
211 212 213
  if (qtype == TAOS_QTYPE_RPC) {
    int32_t code = vnodeCheckWrite(pVnode);
    if (code != TSDB_CODE_SUCCESS) return code;
S
TD-1918  
Shengliang Guan 已提交
214
  }
B
Bomin Zhang 已提交
215

S
TD-1918  
Shengliang Guan 已提交
216 217 218 219 220
  int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;
  SVWriteMsg *pWrite = taosAllocateQitem(size);
  if (pWrite == NULL) {
    return TSDB_CODE_VND_OUT_OF_MEMORY;
  }
B
Bomin Zhang 已提交
221

S
TD-1918  
Shengliang Guan 已提交
222 223 224 225 226
  if (pMsg != NULL) {
    SRpcMsg *pRpcMsg = pMsg;
    pWrite->rpcHandle = pRpcMsg->handle;
    pWrite->rpcAhandle = pRpcMsg->ahandle;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
227

S
TD-1918  
Shengliang Guan 已提交
228
  memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229

J
Jeff Tao 已提交
230
  atomic_add_fetch_32(&pVnode->refCount, 1);
S
TD-1746  
Shengliang Guan 已提交
231
  vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
S
Shengliang Guan 已提交
232

S
TD-1918  
Shengliang Guan 已提交
233 234
  taosWriteQitem(pVnode->wqueue, qtype, pWrite);
  return TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
235
}