vnodeWrite.c 8.6 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
B
Bomin Zhang 已提交
24
#include "tsync.h"
S
slguan 已提交
25
#include "tdataformat.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26 27
#include "vnode.h"
#include "vnodeInt.h"
B
Bomin Zhang 已提交
28
#include "syncInt.h"
J
jtao1735 已提交
29
#include "tcq.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
30

J
Jeff Tao 已提交
31
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
H
TD-90  
Hongze Cheng 已提交
32 33 34 35 36 37
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
38

39
void vnodeInitWriteFp(void) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
40 41 42 43 44
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT]          = vnodeProcessSubmitMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE]   = vnodeProcessDropTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE]  = vnodeProcessAlterTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE]  = vnodeProcessDropStableMsg;
H
TD-90  
Hongze Cheng 已提交
45
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL]  = vnodeProcessUpdateTagValMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
46 47
}

S
TD-1842  
Shengliang Guan 已提交
48 49 50 51 52
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) {
  int32_t     code = 0;
  SVnodeObj * pVnode = vparam;
  SWalHead *  pHead = wparam;
  SRspRet *   pRspRet = rparam;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
53

S
add log  
Shengliang Guan 已提交
54
  if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
S
Shengliang Guan 已提交
55 56
    vError("vgId:%d, msg:%s not processed since no handle, qtype:%s hver:%" PRIu64, pVnode->vgId,
           taosMsg[pHead->msgType], qtypeStr[qtype], pHead->version);
S
TD-1652  
Shengliang Guan 已提交
57
    return TSDB_CODE_VND_MSG_NOT_PROCESSED;
S
add log  
Shengliang Guan 已提交
58
  }
J
Jeff Tao 已提交
59

S
Shengliang Guan 已提交
60 61 62
  vTrace("vgId:%d, msg:%s will be processed in vnode, qtype:%s hver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId,
         taosMsg[pHead->msgType], qtypeStr[qtype], pHead->version, pVnode->version);

S
TD-1652  
Shengliang Guan 已提交
63
  if (pHead->version == 0) {  // from client or CQ
S
add log  
Shengliang Guan 已提交
64
    if (pVnode->status != TAOS_VN_STATUS_READY) {
S
Shengliang Guan 已提交
65 66
      vDebug("vgId:%d, msg:%s not processed since vstatus:%d, qtype:%s hver:%" PRIu64, pVnode->vgId,
             taosMsg[pHead->msgType], pVnode->status, qtypeStr[qtype], pHead->version);
S
Shengliang Guan 已提交
67
      return TSDB_CODE_APP_NOT_READY;  // it may be in deleting or closing state
S
add log  
Shengliang Guan 已提交
68
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
69

70
    if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
S
Shengliang Guan 已提交
71 72
      vDebug("vgId:%d, msg:%s not processed since replica:%d role:%s, qtype:%s hver:%" PRIu64, pVnode->vgId,
             taosMsg[pHead->msgType], pVnode->syncCfg.replica, syncRole[pVnode->role], qtypeStr[qtype], pHead->version);
S
Shengliang Guan 已提交
73
      return TSDB_CODE_APP_NOT_READY;
S
add log  
Shengliang Guan 已提交
74
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
75 76

    // assign version
S
TD-1652  
Shengliang Guan 已提交
77
    pHead->version = pVnode->version + 1;
S
TD-1652  
Shengliang Guan 已提交
78
    if (pVnode->delay) usleep(pVnode->delay * 1000);
79

S
TD-1652  
Shengliang Guan 已提交
80
  } else {  // from wal or forward
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
81 82 83 84
    // for data from WAL or forward, version may be smaller
    if (pHead->version <= pVnode->version) return 0;
  }

S
TD-1652  
Shengliang Guan 已提交
85 86
  // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
  int32_t syncCode = 0;
S
TD-1842  
Shengliang Guan 已提交
87
  syncCode = syncForwardToPeer(pVnode->sync, pHead, pRspRet, qtype);
S
TD-1652  
Shengliang Guan 已提交
88
  if (syncCode < 0) return syncCode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
89 90 91 92 93

  // write into WAL
  code = walWrite(pVnode->wal, pHead);
  if (code < 0) return code;

S
TD-1652  
Shengliang Guan 已提交
94
  pVnode->version = pHead->version;
95

S
TD-1652  
Shengliang Guan 已提交
96
  // write data locally
S
TD-1842  
Shengliang Guan 已提交
97
  code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet);
98
  if (code < 0) return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
99

100
  return syncCode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
101 102
}

S
TD-1915  
Shengliang Guan 已提交
103
static int32_t vnodeCheckWrite(void *param) {
S
TD-1768  
Shengliang Guan 已提交
104 105
  SVnodeObj *pVnode = param;
  if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
S
TD-1982  
Shengliang Guan 已提交
106
    vDebug("vgId:%d, no write auth, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
S
TD-1768  
Shengliang Guan 已提交
107 108 109 110 111
    return TSDB_CODE_VND_NO_WRITE_AUTH;
  }

  // tsdb may be in reset state
  if (pVnode->tsdb == NULL) {
S
TD-1982  
Shengliang Guan 已提交
112
    vDebug("vgId:%d, tsdb is null, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
S
TD-1768  
Shengliang Guan 已提交
113 114 115 116
    return TSDB_CODE_APP_NOT_READY;
  }

  if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
S
TD-1982  
Shengliang Guan 已提交
117
    vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
S
TD-1768  
Shengliang Guan 已提交
118 119 120 121
           pVnode->refCount, pVnode);
    return TSDB_CODE_APP_NOT_READY;
  }

S
TD-2087  
Shengliang Guan 已提交
122 123 124 125 126
  if (pVnode->isFull) {
    vDebug("vgId:%d, vnode is full, refCount:%d", pVnode->vgId, pVnode->refCount);
    return TSDB_CODE_VND_IS_FULL;
  }

S
TD-1768  
Shengliang Guan 已提交
127 128 129
  return TSDB_CODE_SUCCESS;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130 131 132 133 134
void vnodeConfirmForward(void *param, uint64_t version, int32_t code) {
  SVnodeObj *pVnode = (SVnodeObj *)param;
  syncConfirmForward(pVnode->sync, version, code);
}

J
Jeff Tao 已提交
135
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
TD-353  
Hongze Cheng 已提交
136
  int32_t code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
137

138 139
  vTrace("vgId:%d, submit msg is processed", pVnode->vgId);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
140
  // save insert result into item
141
  SShellSubmitRspMsg *pRsp = NULL;
S
TD-1652  
Shengliang Guan 已提交
142
  if (pRet) {
143 144 145 146
    pRet->len = sizeof(SShellSubmitRspMsg);
    pRet->rsp = rpcMallocCont(pRet->len);
    pRsp = pRet->rsp;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
147

H
TD-353  
Hongze Cheng 已提交
148
  if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno;
S
TD-1652  
Shengliang Guan 已提交
149

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
150 151 152
  return code;
}

J
Jeff Tao 已提交
153
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
TD-353  
Hongze Cheng 已提交
154
  int code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
155 156

  STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
S
TD-1415  
Shengliang Guan 已提交
157 158 159 160 161 162 163 164 165
  if (pCfg == NULL) {
    ASSERT(terrno != 0);
    return terrno;
  }

  if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) {
    code = terrno;
    ASSERT(code != 0);
  }
H
Hongze Cheng 已提交
166 167 168

  tsdbClearTableCfg(pCfg);
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
169 170
}

J
Jeff Tao 已提交
171
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
172
  SMDDropTableMsg *pTable = pCont;
H
TD-353  
Hongze Cheng 已提交
173
  int32_t          code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
174

175
  vDebug("vgId:%d, table:%s, start to drop", pVnode->vgId, pTable->tableId);
H
Haojun Liao 已提交
176
  STableId tableId = {.uid = htobe64(pTable->uid), .tid = htonl(pTable->tid)};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177

H
TD-353  
Hongze Cheng 已提交
178
  if (tsdbDropTable(pVnode->tsdb, tableId) < 0) code = terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
179 180 181 182

  return code;
}

J
Jeff Tao 已提交
183
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
184 185 186 187 188 189
  // TODO: disposed in tsdb
  // STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
  // if (pCfg == NULL) return terrno;
  // if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) code = terrno;

  // tsdbClearTableCfg(pCfg);
190
  vDebug("vgId:%d, alter table msg is received", pVnode->vgId);
H
Hongze Cheng 已提交
191
  return TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
192 193
}

J
Jeff Tao 已提交
194
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
S
TD-1732  
Shengliang Guan 已提交
195
  SDropSTableMsg *pTable = pCont;
S
TD-1915  
Shengliang Guan 已提交
196
  int32_t         code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
197

198
  vDebug("vgId:%d, stable:%s, start to drop", pVnode->vgId, pTable->tableId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
199

H
TD-353  
Hongze Cheng 已提交
200 201 202 203
  STableId stableId = {.uid = htobe64(pTable->uid), .tid = -1};

  if (tsdbDropTable(pVnode->tsdb, stableId) < 0) code = terrno;

204
  vDebug("vgId:%d, stable:%s, drop stable result:%s", pVnode->vgId, pTable->tableId, tstrerror(code));
H
TD-353  
Hongze Cheng 已提交
205

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
206 207 208
  return code;
}

H
TD-90  
Hongze Cheng 已提交
209
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
Hongze Cheng 已提交
210
  if (tsdbUpdateTableTagValue(pVnode->tsdb, (SUpdateTableTagValMsg *)pCont) < 0) {
H
TD-353  
Hongze Cheng 已提交
211 212 213
    return terrno;
  }
  return TSDB_CODE_SUCCESS;
H
TD-90  
Hongze Cheng 已提交
214 215
}

S
Shengliang Guan 已提交
216
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
S
TD-1918  
Shengliang Guan 已提交
217 218
  SVnodeObj *pVnode = vparam;
  SWalHead * pHead = wparam;
B
Bomin Zhang 已提交
219

S
TD-1918  
Shengliang Guan 已提交
220 221 222
  if (qtype == TAOS_QTYPE_RPC) {
    int32_t code = vnodeCheckWrite(pVnode);
    if (code != TSDB_CODE_SUCCESS) return code;
S
TD-1918  
Shengliang Guan 已提交
223
  }
B
Bomin Zhang 已提交
224

S
TD-2041  
Shengliang Guan 已提交
225 226 227 228 229
  if (pHead->len > TSDB_MAX_WAL_SIZE) {
    vError("vgId:%d, wal len:%d exceeds limit, hver:%" PRIu64, pVnode->vgId, pHead->len, pHead->version);
    return TSDB_CODE_WAL_SIZE_LIMIT;
  }

S
TD-1918  
Shengliang Guan 已提交
230 231 232 233 234
  int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;
  SVWriteMsg *pWrite = taosAllocateQitem(size);
  if (pWrite == NULL) {
    return TSDB_CODE_VND_OUT_OF_MEMORY;
  }
B
Bomin Zhang 已提交
235

S
TD-1842  
Shengliang Guan 已提交
236 237
  if (rparam != NULL) {
    SRpcMsg *pRpcMsg = rparam;
S
TD-1918  
Shengliang Guan 已提交
238 239 240
    pWrite->rpcHandle = pRpcMsg->handle;
    pWrite->rpcAhandle = pRpcMsg->ahandle;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
241

S
TD-1918  
Shengliang Guan 已提交
242
  memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
243

J
Jeff Tao 已提交
244
  atomic_add_fetch_32(&pVnode->refCount, 1);
S
TD-1746  
Shengliang Guan 已提交
245
  vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
S
Shengliang Guan 已提交
246

S
TD-1918  
Shengliang Guan 已提交
247 248
  taosWriteQitem(pVnode->wqueue, qtype, pWrite);
  return TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
249
}