dnodeVWrite.c 8.2 KB
Newer Older
S
#1177  
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
slguan 已提交
17
#include "os.h"
S
slguan 已提交
18
#include "taosmsg.h"
S
slguan 已提交
19
#include "taoserror.h"
20
#include "tutil.h"
S
slguan 已提交
21
#include "tqueue.h"
S
slguan 已提交
22
#include "trpc.h"
S
[TD-10]  
slguan 已提交
23
#include "tsdb.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
24
#include "twal.h"
S
Shengliang Guan 已提交
25
#include "tdataformat.h"
S
slguan 已提交
26 27
#include "tglobal.h"
#include "vnode.h"
28 29
#include "dnodeInt.h"
#include "dnodeVWrite.h"
30
#include "dnodeMgmt.h"
S
slguan 已提交
31

S
slguan 已提交
32
typedef struct {
J
Jeff Tao 已提交
33
  taos_qall  qall;
S
slguan 已提交
34
  taos_qset  qset;      // queue set
Y
yifan hao 已提交
35
  pthread_t  thread;    // thread
S
slguan 已提交
36
  int32_t    workerId;  // worker ID
Y
yifan hao 已提交
37
} SWriteWorker;
S
slguan 已提交
38

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
39
typedef struct {
J
Jeff Tao 已提交
40
  SRspRet  rspRet;
41 42
  int32_t  processedCount;
  int32_t  code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
43 44 45 46 47
  void    *pCont;
  int32_t  contLen;
  SRpcMsg  rpcMsg;
} SWriteMsg;

J
Jeff Tao 已提交
48
typedef struct {
S
slguan 已提交
49 50
  int32_t        max;        // max number of workers
  int32_t        nextId;     // from 0 to max-1, cyclic
S
slguan 已提交
51
  SWriteWorker  *writeWorker;
52
  pthread_mutex_t mutex;
S
slguan 已提交
53 54
} SWriteWorkerPool;

55 56
static void *dnodeProcessWriteQueue(void *param);
static void  dnodeHandleIdleWorker(SWriteWorker *pWorker);
S
slguan 已提交
57 58 59

SWriteWorkerPool wWorkerPool;

60
int32_t dnodeInitVnodeWrite() {
S
slguan 已提交
61 62 63
  wWorkerPool.max = tsNumOfCores;
  wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
  if (wWorkerPool.writeWorker == NULL) return -1;
64
  pthread_mutex_init(&wWorkerPool.mutex, NULL);
S
slguan 已提交
65

S
slguan 已提交
66
  for (int32_t i = 0; i < wWorkerPool.max; ++i) {
S
slguan 已提交
67
    wWorkerPool.writeWorker[i].workerId = i;
S
slguan 已提交
68 69
  }

70
  dInfo("dnode write is opened, max worker %d", wWorkerPool.max);
S
slguan 已提交
71
  return 0;
S
slguan 已提交
72 73
}

74
void dnodeCleanupVnodeWrite() {
75
  for (int32_t i = 0; i < wWorkerPool.max; ++i) {
76
    SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
77 78 79 80
    if (pWorker->thread) {
      taosQsetThreadResume(pWorker->qset);
    }
  }
81

J
Jeff Tao 已提交
82
  for (int32_t i = 0; i < wWorkerPool.max; ++i) {
83
    SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
J
Jeff Tao 已提交
84 85
    if (pWorker->thread) {
      pthread_join(pWorker->thread, NULL);
86 87
      taosFreeQall(pWorker->qall);
      taosCloseQset(pWorker->qset);
J
Jeff Tao 已提交
88 89 90
    }
  }

91
  pthread_mutex_destroy(&wWorkerPool.mutex);
S
slguan 已提交
92
  free(wWorkerPool.writeWorker);
93
  dInfo("dnode write is closed");
S
slguan 已提交
94 95
}

96
void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
S
slguan 已提交
97
  char *pCont = (char *)pMsg->pCont;
S
slguan 已提交
98

guanshengliang's avatar
guanshengliang 已提交
99
  if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
S
slguan 已提交
100
    SMsgDesc *pDesc = (SMsgDesc *)pCont;
S
slguan 已提交
101
    pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
S
slguan 已提交
102
    pCont += sizeof(SMsgDesc);
S
slguan 已提交
103 104
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105
  SMsgHead *pHead = (SMsgHead *) pCont;
S
slguan 已提交
106 107
  pHead->vgId     = htonl(pHead->vgId);
  pHead->contLen  = htonl(pHead->contLen);
S
slguan 已提交
108

109
  taos_queue queue = vnodeAcquireWqueue(pHead->vgId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
110
  if (queue) {
S
slguan 已提交
111
    // put message into queue
112
    SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
S
slguan 已提交
113 114 115
    pWrite->rpcMsg    = *pMsg;
    pWrite->pCont     = pCont;
    pWrite->contLen   = pHead->contLen;
S
slguan 已提交
116

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
117 118
    taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
  } else {
S
slguan 已提交
119 120 121 122
    SRpcMsg rpcRsp = {
      .handle  = pMsg->handle,
      .pCont   = NULL,
      .contLen = 0,
123
      .code    = TSDB_CODE_VND_INVALID_VGROUP_ID,
S
slguan 已提交
124 125 126
      .msgType = 0
    };
    rpcSendResponse(&rpcRsp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
127
    rpcFreeCont(pMsg->pCont);
S
slguan 已提交
128
  }
S
slguan 已提交
129
}
S
slguan 已提交
130

S
Shengliang Guan 已提交
131
void *dnodeAllocateVnodeWqueue(void *pVnode) {
132
  pthread_mutex_lock(&wWorkerPool.mutex);
S
slguan 已提交
133
  SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
J
Jeff Tao 已提交
134
  void *queue = taosOpenQueue();
135 136 137 138
  if (queue == NULL) {
    pthread_mutex_unlock(&wWorkerPool.mutex);
    return NULL;
  }
S
slguan 已提交
139 140 141

  if (pWorker->qset == NULL) {
    pWorker->qset = taosOpenQset();
S
Shuduo Sang 已提交
142 143
    if (pWorker->qset == NULL) {
      taosCloseQueue(queue);
144
      pthread_mutex_unlock(&wWorkerPool.mutex);
S
Shuduo Sang 已提交
145 146
      return NULL;
    }
S
slguan 已提交
147

148
    taosAddIntoQset(pWorker->qset, queue, pVnode);
J
Jeff Tao 已提交
149
    pWorker->qall = taosAllocateQall();
Y
yifan hao 已提交
150 151 152
    if (pWorker->qall == NULL) {
      taosCloseQset(pWorker->qset);
      taosCloseQueue(queue);
153
      pthread_mutex_unlock(&wWorkerPool.mutex);
Y
yifan hao 已提交
154 155
      return NULL;
    }
S
slguan 已提交
156 157 158 159 160 161
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

    if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
      dError("failed to create thread to process read queue, reason:%s", strerror(errno));
Y
yifan hao 已提交
162
      taosFreeQall(pWorker->qall);
S
slguan 已提交
163
      taosCloseQset(pWorker->qset);
Y
yifan hao 已提交
164 165
      taosCloseQueue(queue);
      queue = NULL;
J
Jeff Tao 已提交
166
    } else {
167
      dDebug("write worker:%d is launched", pWorker->workerId);
168
      wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
S
slguan 已提交
169
    }
J
Jeff Tao 已提交
170 171

    pthread_attr_destroy(&thAttr);
S
slguan 已提交
172
  } else {
173
    taosAddIntoQset(pWorker->qset, queue, pVnode);
S
slguan 已提交
174
    wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
S
slguan 已提交
175
  }
S
slguan 已提交
176

177
  pthread_mutex_unlock(&wWorkerPool.mutex);
178
  dDebug("pVnode:%p, write queue:%p is allocated", pVnode, queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
179

S
slguan 已提交
180 181 182
  return queue;
}

S
Shengliang Guan 已提交
183
void dnodeFreeVnodeWqueue(void *wqueue) {
S
slguan 已提交
184
  taosCloseQueue(wqueue);
S
slguan 已提交
185

S
slguan 已提交
186
  // dynamically adjust the number of threads
S
slguan 已提交
187 188
}

S
Shengliang Guan 已提交
189
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190 191
  SWriteMsg *pWrite = (SWriteMsg *)param;

192 193 194 195
  if (code < 0) pWrite->code = code;
  int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);

  if (count <= 1) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
196 197 198

  SRpcMsg rpcRsp = {
    .handle  = pWrite->rpcMsg.handle,
J
Jeff Tao 已提交
199 200
    .pCont   = pWrite->rspRet.rsp,
    .contLen = pWrite->rspRet.len,
201
    .code    = pWrite->code,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
202 203 204 205 206 207 208 209 210
  };

  rpcSendResponse(&rpcRsp);
  rpcFreeCont(pWrite->rpcMsg.pCont);
  taosFreeQitem(pWrite);

  vnodeRelease(pVnode);
}

S
slguan 已提交
211 212
static void *dnodeProcessWriteQueue(void *param) {
  SWriteWorker *pWorker = (SWriteWorker *)param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
213 214
  SWriteMsg    *pWrite;
  SWalHead     *pHead;
S
slguan 已提交
215
  int32_t       numOfMsgs;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
216
  int           type;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
217
  void         *pVnode, *item;
218
  SRspRet      *pRspRet;
S
slguan 已提交
219

220 221
  dDebug("write worker:%d is running", pWorker->workerId);

S
slguan 已提交
222
  while (1) {
J
Jeff Tao 已提交
223
    numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
224
    if (numOfMsgs == 0) {
225
      dDebug("dnodeProcessWriteQueee: got no message from qset, exiting...");
226
      break;
S
slguan 已提交
227 228
    }

S
slguan 已提交
229
    for (int32_t i = 0; i < numOfMsgs; ++i) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
230
      pWrite = NULL;
231
      pRspRet = NULL;
J
Jeff Tao 已提交
232
      taosGetQitem(pWorker->qall, &type, &item);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
233 234
      if (type == TAOS_QTYPE_RPC) {
        pWrite = (SWriteMsg *)item;
235
        pRspRet = &pWrite->rspRet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236 237 238 239
        pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead));
        pHead->msgType = pWrite->rpcMsg.msgType;
        pHead->version = 0;
        pHead->len = pWrite->contLen;
S
Shengliang Guan 已提交
240
        dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
241 242
      } else {
        pHead = (SWalHead *)item;
S
log  
Shengliang Guan 已提交
243
        dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], pHead->version);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244
      }
S
slguan 已提交
245

246
      int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
247 248 249 250
      if (pWrite) { 
        pWrite->rpcMsg.code = code;
        if (code <= 0) pWrite->processedCount = 1; 
      }
S
slguan 已提交
251
    }
S
slguan 已提交
252

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
253
    walFsync(vnodeGetWal(pVnode));
S
slguan 已提交
254 255

    // browse all items, and process them one by one
J
Jeff Tao 已提交
256
    taosResetQitems(pWorker->qall);
S
slguan 已提交
257
    for (int32_t i = 0; i < numOfMsgs; ++i) {
J
Jeff Tao 已提交
258
      taosGetQitem(pWorker->qall, &type, &item);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
259 260
      if (type == TAOS_QTYPE_RPC) {
        pWrite = (SWriteMsg *)item;
S
Shengliang Guan 已提交
261
        dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
262 263 264 265 266
      } else if (type == TAOS_QTYPE_FWD) {
        pHead = (SWalHead *)item;
        vnodeConfirmForward(pVnode, pHead->version, 0);
        taosFreeQitem(item);
        vnodeRelease(pVnode);
S
slguan 已提交
267
      } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
268 269
        taosFreeQitem(item);
        vnodeRelease(pVnode);
S
slguan 已提交
270 271 272 273 274
      }
    }
  }

  return NULL;
S
slguan 已提交
275 276
}

277
UNUSED_FUNC
S
slguan 已提交
278
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
S
slguan 已提交
279
  int32_t num = taosGetQueueNumber(pWorker->qset);
S
slguan 已提交
280 281

  if (num > 0) {
282
     usleep(30000);
Y
yifan hao 已提交
283
     sched_yield();
S
slguan 已提交
284
  } else {
J
Jeff Tao 已提交
285
     taosFreeQall(pWorker->qall);
S
slguan 已提交
286 287
     taosCloseQset(pWorker->qset);
     pWorker->qset = NULL;
288
     dDebug("write worker:%d is released", pWorker->workerId);
S
slguan 已提交
289 290 291
     pthread_exit(NULL);
  }
}