dnodeVWrite.c 8.0 KB
Newer Older
S
#1177  
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
slguan 已提交
17
#include "os.h"
S
slguan 已提交
18
#include "taosmsg.h"
S
slguan 已提交
19
#include "taoserror.h"
20
#include "tutil.h"
S
slguan 已提交
21
#include "tqueue.h"
S
slguan 已提交
22
#include "trpc.h"
S
[TD-10]  
slguan 已提交
23
#include "tsdb.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
24
#include "twal.h"
S
Shengliang Guan 已提交
25
#include "tdataformat.h"
S
slguan 已提交
26 27
#include "tglobal.h"
#include "vnode.h"
28 29
#include "dnodeInt.h"
#include "dnodeVWrite.h"
30
#include "dnodeMgmt.h"
S
slguan 已提交
31

S
slguan 已提交
32
typedef struct {
J
Jeff Tao 已提交
33
  taos_qall  qall;
S
slguan 已提交
34
  taos_qset  qset;      // queue set
Y
yifan hao 已提交
35
  pthread_t  thread;    // thread
S
slguan 已提交
36
  int32_t    workerId;  // worker ID
Y
yifan hao 已提交
37
} SWriteWorker;
S
slguan 已提交
38

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
39
typedef struct {
J
Jeff Tao 已提交
40
  SRspRet  rspRet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
41 42 43 44 45
  void    *pCont;
  int32_t  contLen;
  SRpcMsg  rpcMsg;
} SWriteMsg;

J
Jeff Tao 已提交
46
typedef struct {
S
slguan 已提交
47 48
  int32_t        max;        // max number of workers
  int32_t        nextId;     // from 0 to max-1, cyclic
S
slguan 已提交
49
  SWriteWorker  *writeWorker;
50
  pthread_mutex_t mutex;
S
slguan 已提交
51 52
} SWriteWorkerPool;

53 54
static void *dnodeProcessWriteQueue(void *param);
static void  dnodeHandleIdleWorker(SWriteWorker *pWorker);
S
slguan 已提交
55 56 57

SWriteWorkerPool wWorkerPool;

58
int32_t dnodeInitVnodeWrite() {
S
slguan 已提交
59 60 61
  wWorkerPool.max = tsNumOfCores;
  wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
  if (wWorkerPool.writeWorker == NULL) return -1;
62
  pthread_mutex_init(&wWorkerPool.mutex, NULL);
S
slguan 已提交
63

S
slguan 已提交
64
  for (int32_t i = 0; i < wWorkerPool.max; ++i) {
S
slguan 已提交
65
    wWorkerPool.writeWorker[i].workerId = i;
S
slguan 已提交
66 67
  }

68
  dInfo("dnode write is opened, max worker %d", wWorkerPool.max);
S
slguan 已提交
69
  return 0;
S
slguan 已提交
70 71
}

72
void dnodeCleanupVnodeWrite() {
73
  for (int32_t i = 0; i < wWorkerPool.max; ++i) {
74
    SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
75 76 77 78
    if (pWorker->thread) {
      taosQsetThreadResume(pWorker->qset);
    }
  }
79

J
Jeff Tao 已提交
80
  for (int32_t i = 0; i < wWorkerPool.max; ++i) {
81
    SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
J
Jeff Tao 已提交
82 83
    if (pWorker->thread) {
      pthread_join(pWorker->thread, NULL);
84 85
      taosFreeQall(pWorker->qall);
      taosCloseQset(pWorker->qset);
J
Jeff Tao 已提交
86 87 88
    }
  }

89
  pthread_mutex_destroy(&wWorkerPool.mutex);
S
slguan 已提交
90
  free(wWorkerPool.writeWorker);
91
  dInfo("dnode write is closed");
S
slguan 已提交
92 93
}

94
void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
S
slguan 已提交
95
  char *pCont = (char *)pMsg->pCont;
S
slguan 已提交
96

guanshengliang's avatar
guanshengliang 已提交
97
  if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
S
slguan 已提交
98
    SMsgDesc *pDesc = (SMsgDesc *)pCont;
S
slguan 已提交
99
    pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
S
slguan 已提交
100
    pCont += sizeof(SMsgDesc);
S
slguan 已提交
101 102
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
103
  SMsgHead *pHead = (SMsgHead *) pCont;
S
slguan 已提交
104 105
  pHead->vgId     = htonl(pHead->vgId);
  pHead->contLen  = htonl(pHead->contLen);
S
slguan 已提交
106

107
  taos_queue queue = vnodeAcquireWqueue(pHead->vgId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108
  if (queue) {
S
slguan 已提交
109
    // put message into queue
110
    SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
S
slguan 已提交
111 112 113
    pWrite->rpcMsg    = *pMsg;
    pWrite->pCont     = pCont;
    pWrite->contLen   = pHead->contLen;
S
slguan 已提交
114

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
115 116
    taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
  } else {
S
slguan 已提交
117 118 119 120
    SRpcMsg rpcRsp = {
      .handle  = pMsg->handle,
      .pCont   = NULL,
      .contLen = 0,
121
      .code    = TSDB_CODE_VND_INVALID_VGROUP_ID,
S
slguan 已提交
122 123 124
      .msgType = 0
    };
    rpcSendResponse(&rpcRsp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
125
    rpcFreeCont(pMsg->pCont);
S
slguan 已提交
126
  }
S
slguan 已提交
127
}
S
slguan 已提交
128

S
Shengliang Guan 已提交
129
void *dnodeAllocateVnodeWqueue(void *pVnode) {
130
  pthread_mutex_lock(&wWorkerPool.mutex);
S
slguan 已提交
131
  SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
J
Jeff Tao 已提交
132
  void *queue = taosOpenQueue();
133 134 135 136
  if (queue == NULL) {
    pthread_mutex_unlock(&wWorkerPool.mutex);
    return NULL;
  }
S
slguan 已提交
137 138 139

  if (pWorker->qset == NULL) {
    pWorker->qset = taosOpenQset();
S
Shuduo Sang 已提交
140 141
    if (pWorker->qset == NULL) {
      taosCloseQueue(queue);
142
      pthread_mutex_unlock(&wWorkerPool.mutex);
S
Shuduo Sang 已提交
143 144
      return NULL;
    }
S
slguan 已提交
145

146
    taosAddIntoQset(pWorker->qset, queue, pVnode);
J
Jeff Tao 已提交
147
    pWorker->qall = taosAllocateQall();
Y
yifan hao 已提交
148 149 150
    if (pWorker->qall == NULL) {
      taosCloseQset(pWorker->qset);
      taosCloseQueue(queue);
151
      pthread_mutex_unlock(&wWorkerPool.mutex);
Y
yifan hao 已提交
152 153
      return NULL;
    }
S
slguan 已提交
154 155 156 157 158 159
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

    if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
      dError("failed to create thread to process read queue, reason:%s", strerror(errno));
Y
yifan hao 已提交
160
      taosFreeQall(pWorker->qall);
S
slguan 已提交
161
      taosCloseQset(pWorker->qset);
Y
yifan hao 已提交
162 163
      taosCloseQueue(queue);
      queue = NULL;
J
Jeff Tao 已提交
164
    } else {
165
      dDebug("write worker:%d is launched", pWorker->workerId);
166
      wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
S
slguan 已提交
167
    }
J
Jeff Tao 已提交
168 169

    pthread_attr_destroy(&thAttr);
S
slguan 已提交
170
  } else {
171
    taosAddIntoQset(pWorker->qset, queue, pVnode);
S
slguan 已提交
172
    wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
S
slguan 已提交
173
  }
S
slguan 已提交
174

175
  pthread_mutex_unlock(&wWorkerPool.mutex);
176
  dDebug("pVnode:%p, write queue:%p is allocated", pVnode, queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177

S
slguan 已提交
178 179 180
  return queue;
}

S
Shengliang Guan 已提交
181
void dnodeFreeVnodeWqueue(void *wqueue) {
S
slguan 已提交
182
  taosCloseQueue(wqueue);
S
slguan 已提交
183

S
slguan 已提交
184
  // dynamically adjust the number of threads
S
slguan 已提交
185 186
}

S
Shengliang Guan 已提交
187
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
188 189 190 191 192 193
  SWriteMsg *pWrite = (SWriteMsg *)param;

  if (code > 0) return;

  SRpcMsg rpcRsp = {
    .handle  = pWrite->rpcMsg.handle,
J
Jeff Tao 已提交
194 195
    .pCont   = pWrite->rspRet.rsp,
    .contLen = pWrite->rspRet.len,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
196 197 198 199 200 201 202 203 204 205
    .code    = code,
  };

  rpcSendResponse(&rpcRsp);
  rpcFreeCont(pWrite->rpcMsg.pCont);
  taosFreeQitem(pWrite);

  vnodeRelease(pVnode);
}

S
slguan 已提交
206 207
static void *dnodeProcessWriteQueue(void *param) {
  SWriteWorker *pWorker = (SWriteWorker *)param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
208 209
  SWriteMsg    *pWrite;
  SWalHead     *pHead;
S
slguan 已提交
210
  int32_t       numOfMsgs;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
211
  int           type;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
212
  void         *pVnode, *item;
213
  SRspRet      *pRspRet;
S
slguan 已提交
214

215 216
  dDebug("write worker:%d is running", pWorker->workerId);

S
slguan 已提交
217
  while (1) {
J
Jeff Tao 已提交
218
    numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
219
    if (numOfMsgs == 0) {
220
      dDebug("dnodeProcessWriteQueee: got no message from qset, exiting...");
221
      break;
S
slguan 已提交
222 223
    }

S
slguan 已提交
224
    for (int32_t i = 0; i < numOfMsgs; ++i) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
225
      pWrite = NULL;
226
      pRspRet = NULL;
J
Jeff Tao 已提交
227
      taosGetQitem(pWorker->qall, &type, &item);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
228 229
      if (type == TAOS_QTYPE_RPC) {
        pWrite = (SWriteMsg *)item;
230
        pRspRet = &pWrite->rspRet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
231 232 233 234
        pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead));
        pHead->msgType = pWrite->rpcMsg.msgType;
        pHead->version = 0;
        pHead->len = pWrite->contLen;
S
Shengliang Guan 已提交
235
        dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236 237
      } else {
        pHead = (SWalHead *)item;
S
log  
Shengliang Guan 已提交
238
        dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], pHead->version);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239
      }
S
slguan 已提交
240

241
      int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
242
      if (pWrite) pWrite->rpcMsg.code = code;
S
slguan 已提交
243
    }
S
slguan 已提交
244

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
245
    walFsync(vnodeGetWal(pVnode));
S
slguan 已提交
246 247

    // browse all items, and process them one by one
J
Jeff Tao 已提交
248
    taosResetQitems(pWorker->qall);
S
slguan 已提交
249
    for (int32_t i = 0; i < numOfMsgs; ++i) {
J
Jeff Tao 已提交
250
      taosGetQitem(pWorker->qall, &type, &item);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
251 252
      if (type == TAOS_QTYPE_RPC) {
        pWrite = (SWriteMsg *)item;
S
Shengliang Guan 已提交
253
        dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
254 255 256 257 258
      } else if (type == TAOS_QTYPE_FWD) {
        pHead = (SWalHead *)item;
        vnodeConfirmForward(pVnode, pHead->version, 0);
        taosFreeQitem(item);
        vnodeRelease(pVnode);
S
slguan 已提交
259
      } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
260 261
        taosFreeQitem(item);
        vnodeRelease(pVnode);
S
slguan 已提交
262 263 264 265 266
      }
    }
  }

  return NULL;
S
slguan 已提交
267 268
}

269
UNUSED_FUNC
S
slguan 已提交
270
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
S
slguan 已提交
271
  int32_t num = taosGetQueueNumber(pWorker->qset);
S
slguan 已提交
272 273

  if (num > 0) {
274
     usleep(30000);
Y
yifan hao 已提交
275
     sched_yield();
S
slguan 已提交
276
  } else {
J
Jeff Tao 已提交
277
     taosFreeQall(pWorker->qall);
S
slguan 已提交
278 279
     taosCloseQset(pWorker->qset);
     pWorker->qset = NULL;
280
     dDebug("write worker:%d is released", pWorker->workerId);
S
slguan 已提交
281 282 283
     pthread_exit(NULL);
  }
}