vnodeWrite.c 7.1 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tqueue.h"
#include "trpc.h"
S
slguan 已提交
22
#include "tutil.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23 24
#include "tsdb.h"
#include "twal.h"
S
slguan 已提交
25
#include "tdataformat.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26 27
#include "vnode.h"
#include "vnodeInt.h"
J
jtao1735 已提交
28
#include "tcq.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
29

J
Jeff Tao 已提交
30
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
H
TD-90  
Hongze Cheng 已提交
31 32 33 34 35 36
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
37

38
void vnodeInitWriteFp(void) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
39 40 41 42 43
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT]          = vnodeProcessSubmitMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE]   = vnodeProcessDropTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE]  = vnodeProcessAlterTableMsg;
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE]  = vnodeProcessDropStableMsg;
H
TD-90  
Hongze Cheng 已提交
44
  vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL]  = vnodeProcessUpdateTagValMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
45 46
}

S
slguan 已提交
47
int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48
  int32_t    code = 0;
S
slguan 已提交
49
  SVnodeObj *pVnode = (SVnodeObj *)param1;
S
TD-1652  
Shengliang Guan 已提交
50
  SWalHead * pHead = param2;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
51

S
add log  
Shengliang Guan 已提交
52 53
  if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
    vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]);
S
TD-1652  
Shengliang Guan 已提交
54
    return TSDB_CODE_VND_MSG_NOT_PROCESSED;
S
add log  
Shengliang Guan 已提交
55
  }
J
Jeff Tao 已提交
56

57
  if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
S
add log  
Shengliang Guan 已提交
58
    vDebug("vgId:%d, msgType:%s not processed, no write auth", pVnode->vgId, taosMsg[pHead->msgType]);
59 60 61
    return TSDB_CODE_VND_NO_WRITE_AUTH;
  }

S
TD-1652  
Shengliang Guan 已提交
62
  // tsdb may be in reset state
S
Shengliang Guan 已提交
63
  if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
S
TD-1652  
Shengliang Guan 已提交
64 65 66
  if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY;

  if (pHead->version == 0) {  // from client or CQ
S
add log  
Shengliang Guan 已提交
67
    if (pVnode->status != TAOS_VN_STATUS_READY) {
S
TD-1652  
Shengliang Guan 已提交
68 69
      vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType],
             pVnode->status);
S
Shengliang Guan 已提交
70
      return TSDB_CODE_APP_NOT_READY;  // it may be in deleting or closing state
S
add log  
Shengliang Guan 已提交
71
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
72

73
    if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
S
TD-1652  
Shengliang Guan 已提交
74 75
      vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[pHead->msgType],
             pVnode->syncCfg.replica, syncRole[pVnode->role]);
S
Shengliang Guan 已提交
76
      return TSDB_CODE_APP_NOT_READY;
S
add log  
Shengliang Guan 已提交
77
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
78 79 80 81

    // assign version
    pVnode->version++;
    pHead->version = pVnode->version;
S
TD-1652  
Shengliang Guan 已提交
82
    if (pVnode->delay) usleep(pVnode->delay * 1000);
83

S
TD-1652  
Shengliang Guan 已提交
84
  } else {  // from wal or forward
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
85 86 87 88 89 90 91 92 93 94
    // for data from WAL or forward, version may be smaller
    if (pHead->version <= pVnode->version) return 0;
  }

  pVnode->version = pHead->version;

  // write into WAL
  code = walWrite(pVnode->wal, pHead);
  if (code < 0) return code;

S
TD-1652  
Shengliang Guan 已提交
95
  // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
96
  int32_t syncCode = 0;
97
  syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
98 99
  if (syncCode < 0) return syncCode;

S
TD-1652  
Shengliang Guan 已提交
100
  // write data locally
101 102
  code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
  if (code < 0) return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
103

104
  return syncCode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105 106
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
107 108 109 110 111
void vnodeConfirmForward(void *param, uint64_t version, int32_t code) {
  SVnodeObj *pVnode = (SVnodeObj *)param;
  syncConfirmForward(pVnode->sync, version, code);
}

J
Jeff Tao 已提交
112
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
TD-353  
Hongze Cheng 已提交
113
  int32_t code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
114

115 116
  vTrace("vgId:%d, submit msg is processed", pVnode->vgId);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
117
  // save insert result into item
118
  SShellSubmitRspMsg *pRsp = NULL;
S
TD-1652  
Shengliang Guan 已提交
119
  if (pRet) {
120 121 122 123
    pRet->len = sizeof(SShellSubmitRspMsg);
    pRet->rsp = rpcMallocCont(pRet->len);
    pRsp = pRet->rsp;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
124

H
TD-353  
Hongze Cheng 已提交
125
  if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno;
S
TD-1652  
Shengliang Guan 已提交
126

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
127 128 129
  return code;
}

J
Jeff Tao 已提交
130
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
TD-353  
Hongze Cheng 已提交
131
  int code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
132 133

  STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
S
TD-1415  
Shengliang Guan 已提交
134 135 136 137 138 139 140 141 142
  if (pCfg == NULL) {
    ASSERT(terrno != 0);
    return terrno;
  }

  if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) {
    code = terrno;
    ASSERT(code != 0);
  }
H
Hongze Cheng 已提交
143 144 145

  tsdbClearTableCfg(pCfg);
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
146 147
}

J
Jeff Tao 已提交
148
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
149
  SMDDropTableMsg *pTable = pCont;
H
TD-353  
Hongze Cheng 已提交
150
  int32_t          code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151

152
  vDebug("vgId:%d, table:%s, start to drop", pVnode->vgId, pTable->tableId);
H
TD-353  
Hongze Cheng 已提交
153
  STableId tableId = {.uid = htobe64(pTable->uid), .tid = htonl(pTable->sid)};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
154

H
TD-353  
Hongze Cheng 已提交
155
  if (tsdbDropTable(pVnode->tsdb, tableId) < 0) code = terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156 157 158 159

  return code;
}

J
Jeff Tao 已提交
160
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
161 162 163 164 165 166
  // TODO: disposed in tsdb
  // STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
  // if (pCfg == NULL) return terrno;
  // if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) code = terrno;

  // tsdbClearTableCfg(pCfg);
167
  vDebug("vgId:%d, alter table msg is received", pVnode->vgId);
H
Hongze Cheng 已提交
168
  return TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
169 170
}

J
Jeff Tao 已提交
171
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
172
  SMDDropSTableMsg *pTable = pCont;
H
TD-353  
Hongze Cheng 已提交
173
  int32_t           code = TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
174

175
  vDebug("vgId:%d, stable:%s, start to drop", pVnode->vgId, pTable->tableId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
176

H
TD-353  
Hongze Cheng 已提交
177 178 179 180
  STableId stableId = {.uid = htobe64(pTable->uid), .tid = -1};

  if (tsdbDropTable(pVnode->tsdb, stableId) < 0) code = terrno;

181
  vDebug("vgId:%d, stable:%s, drop stable result:%s", pVnode->vgId, pTable->tableId, tstrerror(code));
H
TD-353  
Hongze Cheng 已提交
182

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
183 184 185
  return code;
}

H
TD-90  
Hongze Cheng 已提交
186
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
H
Hongze Cheng 已提交
187
  if (tsdbUpdateTableTagValue(pVnode->tsdb, (SUpdateTableTagValMsg *)pCont) < 0) {
H
TD-353  
Hongze Cheng 已提交
188 189 190
    return terrno;
  }
  return TSDB_CODE_SUCCESS;
H
TD-90  
Hongze Cheng 已提交
191 192
}

J
Jeff Tao 已提交
193
int vnodeWriteToQueue(void *param, void *data, int type) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
194
  SVnodeObj *pVnode = param;
S
TD-1652  
Shengliang Guan 已提交
195
  SWalHead * pHead = data;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
196 197 198 199 200

  int size = sizeof(SWalHead) + pHead->len;
  SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
  memcpy(pWal, pHead, size);

J
Jeff Tao 已提交
201
  atomic_add_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
202 203
  vDebug("vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount);

J
Jeff Tao 已提交
204
  taosWriteQitem(pVnode->wqueue, type, pWal);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
205 206 207

  return 0;
}