dnodeVWrite.c 7.1 KB
Newer Older
S
#1177  
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
slguan 已提交
17
#include "os.h"
S
slguan 已提交
18
#include "taosmsg.h"
S
slguan 已提交
19
#include "taoserror.h"
20
#include "tutil.h"
S
slguan 已提交
21
#include "tqueue.h"
S
slguan 已提交
22
#include "trpc.h"
S
[TD-10]  
slguan 已提交
23
#include "tsdb.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
24
#include "twal.h"
S
Shengliang Guan 已提交
25
#include "tdataformat.h"
S
slguan 已提交
26 27
#include "tglobal.h"
#include "vnode.h"
28 29
#include "dnodeInt.h"
#include "dnodeVWrite.h"
30
#include "dnodeMgmt.h"
S
slguan 已提交
31

S
slguan 已提交
32
typedef struct {
J
Jeff Tao 已提交
33
  taos_qall  qall;
S
slguan 已提交
34
  taos_qset  qset;      // queue set
Y
yifan hao 已提交
35
  pthread_t  thread;    // thread
S
slguan 已提交
36
  int32_t    workerId;  // worker ID
Y
yifan hao 已提交
37
} SWriteWorker;
S
slguan 已提交
38

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
39
typedef struct {
J
Jeff Tao 已提交
40
  SRspRet  rspRet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
41 42 43 44 45
  void    *pCont;
  int32_t  contLen;
  SRpcMsg  rpcMsg;
} SWriteMsg;

J
Jeff Tao 已提交
46
typedef struct {
S
slguan 已提交
47 48
  int32_t        max;        // max number of workers
  int32_t        nextId;     // from 0 to max-1, cyclic
S
slguan 已提交
49 50 51
  SWriteWorker  *writeWorker;
} SWriteWorkerPool;

52 53
static void *dnodeProcessWriteQueue(void *param);
static void  dnodeHandleIdleWorker(SWriteWorker *pWorker);
S
slguan 已提交
54 55 56

SWriteWorkerPool wWorkerPool;

57
int32_t dnodeInitVnodeWrite() {
S
slguan 已提交
58 59 60 61
  wWorkerPool.max = tsNumOfCores;
  wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
  if (wWorkerPool.writeWorker == NULL) return -1;

S
slguan 已提交
62
  for (int32_t i = 0; i < wWorkerPool.max; ++i) {
S
slguan 已提交
63
    wWorkerPool.writeWorker[i].workerId = i;
S
slguan 已提交
64 65
  }

S
slguan 已提交
66
  dPrint("dnode write is opened");
S
slguan 已提交
67
  return 0;
S
slguan 已提交
68 69
}

70
void dnodeCleanupVnodeWrite() {
71 72 73 74 75 76
  for (int32_t i = 0; i < wWorkerPool.max; ++i) {
    SWriteWorker *pWorker =  wWorkerPool.writeWorker + i;
    if (pWorker->thread) {
      taosQsetThreadResume(pWorker->qset);
    }
  }
J
Jeff Tao 已提交
77 78 79 80 81
  
  for (int32_t i = 0; i < wWorkerPool.max; ++i) {
    SWriteWorker *pWorker =  wWorkerPool.writeWorker + i;
    if (pWorker->thread) {
      pthread_join(pWorker->thread, NULL);
82 83
      taosFreeQall(pWorker->qall);
      taosCloseQset(pWorker->qset);
J
Jeff Tao 已提交
84 85 86
    }
  }

S
slguan 已提交
87
  free(wWorkerPool.writeWorker);
S
slguan 已提交
88
  dPrint("dnode write is closed");
S
slguan 已提交
89 90
}

91
void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
S
slguan 已提交
92
  char *pCont = (char *)pMsg->pCont;
S
slguan 已提交
93

guanshengliang's avatar
guanshengliang 已提交
94
  if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
S
slguan 已提交
95
    SMsgDesc *pDesc = (SMsgDesc *)pCont;
S
slguan 已提交
96
    pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
S
slguan 已提交
97
    pCont += sizeof(SMsgDesc);
S
slguan 已提交
98 99
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100
  SMsgHead *pHead = (SMsgHead *) pCont;
S
slguan 已提交
101 102
  pHead->vgId     = htonl(pHead->vgId);
  pHead->contLen  = htonl(pHead->contLen);
S
slguan 已提交
103

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104 105
  taos_queue queue = vnodeGetWqueue(pHead->vgId);
  if (queue) {
S
slguan 已提交
106
    // put message into queue
107
    SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
S
slguan 已提交
108 109 110
    pWrite->rpcMsg    = *pMsg;
    pWrite->pCont     = pCont;
    pWrite->contLen   = pHead->contLen;
S
slguan 已提交
111

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
112 113
    taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
  } else {
S
slguan 已提交
114 115 116 117
    SRpcMsg rpcRsp = {
      .handle  = pMsg->handle,
      .pCont   = NULL,
      .contLen = 0,
118
      .code    = TSDB_CODE_VND_INVALID_VGROUP_ID,
S
slguan 已提交
119 120 121
      .msgType = 0
    };
    rpcSendResponse(&rpcRsp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
122
    rpcFreeCont(pMsg->pCont);
S
slguan 已提交
123
  }
S
slguan 已提交
124
}
S
slguan 已提交
125

S
Shengliang Guan 已提交
126
void *dnodeAllocateVnodeWqueue(void *pVnode) {
S
slguan 已提交
127
  SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
J
Jeff Tao 已提交
128
  void *queue = taosOpenQueue();
S
[TD-10]  
slguan 已提交
129
  if (queue == NULL) return NULL;
S
slguan 已提交
130 131 132

  if (pWorker->qset == NULL) {
    pWorker->qset = taosOpenQset();
S
Shuduo Sang 已提交
133 134 135 136
    if (pWorker->qset == NULL) {
      taosCloseQueue(queue);
      return NULL;
    }
S
slguan 已提交
137

138
    taosAddIntoQset(pWorker->qset, queue, pVnode);
J
Jeff Tao 已提交
139
    pWorker->qall = taosAllocateQall();
Y
yifan hao 已提交
140 141 142 143 144
    if (pWorker->qall == NULL) {
      taosCloseQset(pWorker->qset);
      taosCloseQueue(queue);
      return NULL;
    }
S
slguan 已提交
145 146 147 148 149 150
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

    if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
      dError("failed to create thread to process read queue, reason:%s", strerror(errno));
Y
yifan hao 已提交
151
      taosFreeQall(pWorker->qall);
S
slguan 已提交
152
      taosCloseQset(pWorker->qset);
Y
yifan hao 已提交
153 154
      taosCloseQueue(queue);
      queue = NULL;
J
Jeff Tao 已提交
155 156
    } else {
      dTrace("write worker:%d is launched", pWorker->workerId);
157
      wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
S
slguan 已提交
158
    }
J
Jeff Tao 已提交
159 160

    pthread_attr_destroy(&thAttr);
S
slguan 已提交
161
  } else {
162
    taosAddIntoQset(pWorker->qset, queue, pVnode);
S
slguan 已提交
163
    wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
S
slguan 已提交
164
  }
S
slguan 已提交
165

J
Jeff Tao 已提交
166
  dTrace("pVnode:%p, write queue:%p is allocated", pVnode, queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
167

S
slguan 已提交
168 169 170
  return queue;
}

S
Shengliang Guan 已提交
171
void dnodeFreeVnodeWqueue(void *wqueue) {
S
slguan 已提交
172
  taosCloseQueue(wqueue);
S
slguan 已提交
173

S
slguan 已提交
174
  // dynamically adjust the number of threads
S
slguan 已提交
175 176
}

S
Shengliang Guan 已提交
177
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
178 179 180 181 182 183
  SWriteMsg *pWrite = (SWriteMsg *)param;

  if (code > 0) return;

  SRpcMsg rpcRsp = {
    .handle  = pWrite->rpcMsg.handle,
J
Jeff Tao 已提交
184 185
    .pCont   = pWrite->rspRet.rsp,
    .contLen = pWrite->rspRet.len,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
186 187 188 189 190 191 192 193 194 195
    .code    = code,
  };

  rpcSendResponse(&rpcRsp);
  rpcFreeCont(pWrite->rpcMsg.pCont);
  taosFreeQitem(pWrite);

  vnodeRelease(pVnode);
}

S
slguan 已提交
196 197
static void *dnodeProcessWriteQueue(void *param) {
  SWriteWorker *pWorker = (SWriteWorker *)param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
198 199
  SWriteMsg    *pWrite;
  SWalHead     *pHead;
S
slguan 已提交
200
  int32_t       numOfMsgs;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201
  int           type;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
202
  void         *pVnode, *item;
S
slguan 已提交
203 204

  while (1) {
J
Jeff Tao 已提交
205
    numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
206
    if (numOfMsgs == 0) {
207 208
      dTrace("dnodeProcessWriteQueee: got no message from qset, exiting...");
      break;
S
slguan 已提交
209 210
    }

S
slguan 已提交
211
    for (int32_t i = 0; i < numOfMsgs; ++i) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
212
      pWrite = NULL;
J
Jeff Tao 已提交
213
      taosGetQitem(pWorker->qall, &type, &item);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
214 215 216 217 218 219
      if (type == TAOS_QTYPE_RPC) {
        pWrite = (SWriteMsg *)item;
        pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead));
        pHead->msgType = pWrite->rpcMsg.msgType;
        pHead->version = 0;
        pHead->len = pWrite->contLen;
220
        dTrace("%p, msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
221 222 223
      } else {
        pHead = (SWalHead *)item;
      }
S
slguan 已提交
224

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
225 226
      int32_t code = vnodeProcessWrite(pVnode, type, pHead, item);
      if (pWrite) pWrite->rpcMsg.code = code;
S
slguan 已提交
227
    }
S
slguan 已提交
228

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229
    walFsync(vnodeGetWal(pVnode));
S
slguan 已提交
230 231

    // browse all items, and process them one by one
J
Jeff Tao 已提交
232
    taosResetQitems(pWorker->qall);
S
slguan 已提交
233
    for (int32_t i = 0; i < numOfMsgs; ++i) {
J
Jeff Tao 已提交
234
      taosGetQitem(pWorker->qall, &type, &item);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
235 236
      if (type == TAOS_QTYPE_RPC) {
        pWrite = (SWriteMsg *)item;
S
Shengliang Guan 已提交
237
        dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code); 
S
slguan 已提交
238
      } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239 240
        taosFreeQitem(item);
        vnodeRelease(pVnode);
S
slguan 已提交
241 242 243 244 245
      }
    }
  }

  return NULL;
S
slguan 已提交
246 247
}

248
UNUSED_FUNC
S
slguan 已提交
249
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
S
slguan 已提交
250
  int32_t num = taosGetQueueNumber(pWorker->qset);
S
slguan 已提交
251 252

  if (num > 0) {
253
     usleep(30000);
Y
yifan hao 已提交
254
     sched_yield();
S
slguan 已提交
255
  } else {
J
Jeff Tao 已提交
256
     taosFreeQall(pWorker->qall);
S
slguan 已提交
257 258
     taosCloseQset(pWorker->qset);
     pWorker->qset = NULL;
J
Jeff Tao 已提交
259
     dTrace("write worker:%d is released", pWorker->workerId);
S
slguan 已提交
260 261 262
     pthread_exit(NULL);
  }
}