vnodeWrite.c 7.0 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tqueue.h"
#include "trpc.h"
S
slguan 已提交
22
#include "tutil.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23 24
#include "tsdb.h"
#include "twal.h"
S
slguan 已提交
25
#include "tdataformat.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26 27
#include "vnode.h"
#include "vnodeInt.h"
J
jtao1735 已提交
28
#include "tcq.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
29

J
Jeff Tao 已提交
30
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
H
TD-90  
Hongze Cheng 已提交
31 32 33 34 35 36
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
37

38
void vnodeInitWriteFp(void) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
39 40 41 42 43
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT]          = vnodeProcessSubmitMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE]   = vnodeProcessDropTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE]  = vnodeProcessAlterTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE]  = vnodeProcessDropStableMsg;
H
TD-90  
Hongze Cheng 已提交
44
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL]  = vnodeProcessUpdateTagValMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
45 46
}

S
slguan 已提交
47
int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48
  int32_t    code = 0;
S
slguan 已提交
49
  SVnodeObj *pVnode = (SVnodeObj *)param1;
S
TD-1652  
Shengliang Guan 已提交
50
  SWalHead * pHead = param2;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
51

S
add log  
Shengliang Guan 已提交
52 53
  if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
    vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]);
S
TD-1652  
Shengliang Guan 已提交
54
    return TSDB_CODE_VND_MSG_NOT_PROCESSED;
S
add log  
Shengliang Guan 已提交
55
  }
J
Jeff Tao 已提交
56

57
  if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
S
add log  
Shengliang Guan 已提交
58
    vDebug("vgId:%d, msgType:%s not processed, no write auth", pVnode->vgId, taosMsg[pHead->msgType]);
59 60 61
    return TSDB_CODE_VND_NO_WRITE_AUTH;
  }

S
TD-1652  
Shengliang Guan 已提交
62
  // tsdb may be in reset state
S
Shengliang Guan 已提交
63
  if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
S
TD-1652  
Shengliang Guan 已提交
64 65 66
  if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY;

  if (pHead->version == 0) {  // from client or CQ
S
add log  
Shengliang Guan 已提交
67
    if (pVnode->status != TAOS_VN_STATUS_READY) {
S
TD-1652  
Shengliang Guan 已提交
68 69
      vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType],
             pVnode->status);
S
Shengliang Guan 已提交
70
      return TSDB_CODE_APP_NOT_READY;  // it may be in deleting or closing state
S
add log  
Shengliang Guan 已提交
71
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
72

73
    if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
S
TD-1652  
Shengliang Guan 已提交
74 75
      vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[pHead->msgType],
             pVnode->syncCfg.replica, syncRole[pVnode->role]);
S
Shengliang Guan 已提交
76
      return TSDB_CODE_APP_NOT_READY;
S
add log  
Shengliang Guan 已提交
77
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
78 79

    // assign version
S
TD-1652  
Shengliang Guan 已提交
80
    pHead->version = pVnode->version + 1;
S
TD-1652  
Shengliang Guan 已提交
81
    if (pVnode->delay) usleep(pVnode->delay * 1000);
82

S
TD-1652  
Shengliang Guan 已提交
83
  } else {  // from wal or forward
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
84 85 86 87
    // for data from WAL or forward, version may be smaller
    if (pHead->version <= pVnode->version) return 0;
  }

S
TD-1652  
Shengliang Guan 已提交
88 89 90 91
  // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
  int32_t syncCode = 0;
  syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
  if (syncCode < 0) return syncCode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
92 93 94 95 96

  // write into WAL
  code = walWrite(pVnode->wal, pHead);
  if (code < 0) return code;

S
TD-1652  
Shengliang Guan 已提交
97
  pVnode->version = pHead->version;
98

S
TD-1652  
Shengliang Guan 已提交
99
  // write data locally
100 101
  code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
  if (code < 0) return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102

103
  return syncCode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104 105
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
106 107 108 109 110
void vnodeConfirmForward(void *param, uint64_t version, int32_t code) {
  SVnodeObj *pVnode = (SVnodeObj *)param;
  syncConfirmForward(pVnode->sync, version, code);
}

J
Jeff Tao 已提交
111
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
TD-353  
Hongze Cheng 已提交
112
  int32_t code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
113

114 115
  vTrace("vgId:%d, submit msg is processed", pVnode->vgId);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
116
  // save insert result into item
117
  SShellSubmitRspMsg *pRsp = NULL;
S
TD-1652  
Shengliang Guan 已提交
118
  if (pRet) {
119 120 121 122
    pRet->len = sizeof(SShellSubmitRspMsg);
    pRet->rsp = rpcMallocCont(pRet->len);
    pRsp = pRet->rsp;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
123

H
TD-353  
Hongze Cheng 已提交
124
  if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno;
S
TD-1652  
Shengliang Guan 已提交
125

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
126 127 128
  return code;
}

J
Jeff Tao 已提交
129
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
TD-353  
Hongze Cheng 已提交
130
  int code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
131 132

  STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
S
TD-1415  
Shengliang Guan 已提交
133 134 135 136 137 138 139 140 141
  if (pCfg == NULL) {
    ASSERT(terrno != 0);
    return terrno;
  }

  if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) {
    code = terrno;
    ASSERT(code != 0);
  }
H
Hongze Cheng 已提交
142 143 144

  tsdbClearTableCfg(pCfg);
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
145 146
}

J
Jeff Tao 已提交
147
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
148
  SMDDropTableMsg *pTable = pCont;
H
TD-353  
Hongze Cheng 已提交
149
  int32_t          code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
150

151
  vDebug("vgId:%d, table:%s, start to drop", pVnode->vgId, pTable->tableId);
H
TD-353  
Hongze Cheng 已提交
152
  STableId tableId = {.uid = htobe64(pTable->uid), .tid = htonl(pTable->sid)};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
153

H
TD-353  
Hongze Cheng 已提交
154
  if (tsdbDropTable(pVnode->tsdb, tableId) < 0) code = terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
155 156 157 158

  return code;
}

J
Jeff Tao 已提交
159
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
160 161 162 163 164 165
  // TODO: disposed in tsdb
  // STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
  // if (pCfg == NULL) return terrno;
  // if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) code = terrno;

  // tsdbClearTableCfg(pCfg);
166
  vDebug("vgId:%d, alter table msg is received", pVnode->vgId);
H
Hongze Cheng 已提交
167
  return TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
168 169
}

J
Jeff Tao 已提交
170
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
171
  SMDDropSTableMsg *pTable = pCont;
H
TD-353  
Hongze Cheng 已提交
172
  int32_t           code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173

174
  vDebug("vgId:%d, stable:%s, start to drop", pVnode->vgId, pTable->tableId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
175

H
TD-353  
Hongze Cheng 已提交
176 177 178 179
  STableId stableId = {.uid = htobe64(pTable->uid), .tid = -1};

  if (tsdbDropTable(pVnode->tsdb, stableId) < 0) code = terrno;

180
  vDebug("vgId:%d, stable:%s, drop stable result:%s", pVnode->vgId, pTable->tableId, tstrerror(code));
H
TD-353  
Hongze Cheng 已提交
181

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
182 183 184
  return code;
}

H
TD-90  
Hongze Cheng 已提交
185
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
Hongze Cheng 已提交
186
  if (tsdbUpdateTableTagValue(pVnode->tsdb, (SUpdateTableTagValMsg *)pCont) < 0) {
H
TD-353  
Hongze Cheng 已提交
187 188 189
    return terrno;
  }
  return TSDB_CODE_SUCCESS;
H
TD-90  
Hongze Cheng 已提交
190 191
}

J
Jeff Tao 已提交
192
int vnodeWriteToQueue(void *param, void *data, int type) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
193
  SVnodeObj *pVnode = param;
S
TD-1652  
Shengliang Guan 已提交
194
  SWalHead * pHead = data;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
195 196 197 198 199

  int size = sizeof(SWalHead) + pHead->len;
  SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
  memcpy(pWal, pHead, size);

J
Jeff Tao 已提交
200
  atomic_add_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
201 202
  vDebug("vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount);

J
Jeff Tao 已提交
203
  taosWriteQitem(pVnode->wqueue, type, pWal);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
204 205 206

  return 0;
}