dnodeVWrite.c 6.8 KB
Newer Older
S
#1177  
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
slguan 已提交
17
#include "os.h"
S
slguan 已提交
18
#include "taosmsg.h"
S
slguan 已提交
19
#include "taoserror.h"
20
#include "tutil.h"
S
slguan 已提交
21
#include "tqueue.h"
S
slguan 已提交
22
#include "trpc.h"
S
[TD-10]  
slguan 已提交
23
#include "tsdb.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
24
#include "twal.h"
S
slguan 已提交
25 26 27
#include "tglobal.h"
#include "vnode.h"
#include "tdataformat.h"
28 29
#include "dnodeInt.h"
#include "dnodeVWrite.h"
S
slguan 已提交
30
#include "dnodeMgmt.h"
S
slguan 已提交
31

S
slguan 已提交
32
typedef struct {
J
Jeff Tao 已提交
33
  taos_qall  qall;
S
slguan 已提交
34 35
  taos_qset  qset;      // queue set
  pthread_t  thread;    // thread 
S
slguan 已提交
36
  int32_t    workerId;  // worker ID
S
slguan 已提交
37 38
} SWriteWorker;  

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
39
typedef struct {
J
Jeff Tao 已提交
40
  SRspRet  rspRet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
41 42 43 44 45
  void    *pCont;
  int32_t  contLen;
  SRpcMsg  rpcMsg;
} SWriteMsg;

J
Jeff Tao 已提交
46
typedef struct {
S
slguan 已提交
47 48
  int32_t        max;        // max number of workers
  int32_t        nextId;     // from 0 to max-1, cyclic
S
slguan 已提交
49 50 51
  SWriteWorker  *writeWorker;
} SWriteWorkerPool;

52 53
static void *dnodeProcessWriteQueue(void *param);
static void  dnodeHandleIdleWorker(SWriteWorker *pWorker);
S
slguan 已提交
54 55 56

SWriteWorkerPool wWorkerPool;

S
slguan 已提交
57
int32_t dnodeInitWrite() {
S
slguan 已提交
58 59 60 61
  wWorkerPool.max = tsNumOfCores;
  wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
  if (wWorkerPool.writeWorker == NULL) return -1;

S
slguan 已提交
62
  for (int32_t i = 0; i < wWorkerPool.max; ++i) {
S
slguan 已提交
63
    wWorkerPool.writeWorker[i].workerId = i;
S
slguan 已提交
64 65
  }

S
slguan 已提交
66
  dPrint("dnode write is opened");
S
slguan 已提交
67
  return 0;
S
slguan 已提交
68 69
}

S
slguan 已提交
70
void dnodeCleanupWrite() {
71 72 73 74 75 76
  for (int32_t i = 0; i < wWorkerPool.max; ++i) {
    SWriteWorker *pWorker =  wWorkerPool.writeWorker + i;
    if (pWorker->thread) {
      taosQsetThreadResume(pWorker->qset);
    }
  }
J
Jeff Tao 已提交
77 78 79 80 81
  
  for (int32_t i = 0; i < wWorkerPool.max; ++i) {
    SWriteWorker *pWorker =  wWorkerPool.writeWorker + i;
    if (pWorker->thread) {
      pthread_join(pWorker->thread, NULL);
82 83
      taosFreeQall(pWorker->qall);
      taosCloseQset(pWorker->qset);
J
Jeff Tao 已提交
84 85 86
    }
  }

S
slguan 已提交
87
  free(wWorkerPool.writeWorker);
S
slguan 已提交
88
  dPrint("dnode write is closed");
S
slguan 已提交
89 90
}

91
void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
S
slguan 已提交
92
  char *pCont = (char *)pMsg->pCont;
S
slguan 已提交
93

guanshengliang's avatar
guanshengliang 已提交
94
  if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
S
slguan 已提交
95
    SMsgDesc *pDesc = (SMsgDesc *)pCont;
S
slguan 已提交
96
    pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
S
slguan 已提交
97
    pCont += sizeof(SMsgDesc);
S
slguan 已提交
98 99
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100
  SMsgHead *pHead = (SMsgHead *) pCont;
S
slguan 已提交
101 102
  pHead->vgId     = htonl(pHead->vgId);
  pHead->contLen  = htonl(pHead->contLen);
S
slguan 已提交
103

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104 105
  taos_queue queue = vnodeGetWqueue(pHead->vgId);
  if (queue) {
S
slguan 已提交
106
    // put message into queue
107
    SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
S
slguan 已提交
108 109 110
    pWrite->rpcMsg    = *pMsg;
    pWrite->pCont     = pCont;
    pWrite->contLen   = pHead->contLen;
S
slguan 已提交
111

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
112 113
    taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
  } else {
S
slguan 已提交
114 115 116 117 118 119 120 121
    SRpcMsg rpcRsp = {
      .handle  = pMsg->handle,
      .pCont   = NULL,
      .contLen = 0,
      .code    = TSDB_CODE_INVALID_VGROUP_ID,
      .msgType = 0
    };
    rpcSendResponse(&rpcRsp);
S
slguan 已提交
122
  }
S
slguan 已提交
123
}
S
slguan 已提交
124

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
125
void *dnodeAllocateWqueue(void *pVnode) {
S
slguan 已提交
126
  SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
J
Jeff Tao 已提交
127
  void *queue = taosOpenQueue();
S
[TD-10]  
slguan 已提交
128
  if (queue == NULL) return NULL;
S
slguan 已提交
129 130 131 132 133

  if (pWorker->qset == NULL) {
    pWorker->qset = taosOpenQset();
    if (pWorker->qset == NULL) return NULL;

134
    taosAddIntoQset(pWorker->qset, queue, pVnode);
J
Jeff Tao 已提交
135
    pWorker->qall = taosAllocateQall();
S
slguan 已提交
136 137
    wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;

S
slguan 已提交
138 139 140 141 142 143 144
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

    if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
      dError("failed to create thread to process read queue, reason:%s", strerror(errno));
      taosCloseQset(pWorker->qset);
J
Jeff Tao 已提交
145 146
    } else {
      dTrace("write worker:%d is launched", pWorker->workerId);
S
slguan 已提交
147
    }
J
Jeff Tao 已提交
148 149

    pthread_attr_destroy(&thAttr);
S
slguan 已提交
150
  } else {
151
    taosAddIntoQset(pWorker->qset, queue, pVnode);
S
slguan 已提交
152
    wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
S
slguan 已提交
153
  }
S
slguan 已提交
154

J
Jeff Tao 已提交
155
  dTrace("pVnode:%p, write queue:%p is allocated", pVnode, queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156

S
slguan 已提交
157 158 159
  return queue;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
160
void dnodeFreeWqueue(void *wqueue) {
S
slguan 已提交
161
  taosCloseQueue(wqueue);
S
slguan 已提交
162

S
slguan 已提交
163
  // dynamically adjust the number of threads
S
slguan 已提交
164 165
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
166 167 168 169 170 171 172
void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code) {
  SWriteMsg *pWrite = (SWriteMsg *)param;

  if (code > 0) return;

  SRpcMsg rpcRsp = {
    .handle  = pWrite->rpcMsg.handle,
J
Jeff Tao 已提交
173 174
    .pCont   = pWrite->rspRet.rsp,
    .contLen = pWrite->rspRet.len,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
175 176 177 178 179 180 181 182 183 184
    .code    = code,
  };

  rpcSendResponse(&rpcRsp);
  rpcFreeCont(pWrite->rpcMsg.pCont);
  taosFreeQitem(pWrite);

  vnodeRelease(pVnode);
}

S
slguan 已提交
185 186
static void *dnodeProcessWriteQueue(void *param) {
  SWriteWorker *pWorker = (SWriteWorker *)param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187 188
  SWriteMsg    *pWrite;
  SWalHead     *pHead;
S
slguan 已提交
189
  int32_t       numOfMsgs;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190
  int           type;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
191
  void         *pVnode, *item;
S
slguan 已提交
192 193

  while (1) {
J
Jeff Tao 已提交
194
    numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
195 196 197
    if (numOfMsgs ==0) { 
      dTrace("dnodeProcessWriteQueee: got no message from qset, exiting...");
      break;
S
slguan 已提交
198 199
    }

S
slguan 已提交
200
    for (int32_t i = 0; i < numOfMsgs; ++i) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201
      pWrite = NULL;
J
Jeff Tao 已提交
202
      taosGetQitem(pWorker->qall, &type, &item);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
203 204 205 206 207 208
      if (type == TAOS_QTYPE_RPC) {
        pWrite = (SWriteMsg *)item;
        pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead));
        pHead->msgType = pWrite->rpcMsg.msgType;
        pHead->version = 0;
        pHead->len = pWrite->contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
209
        dTrace("%p, msg:%s will be processed", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
210 211 212
      } else {
        pHead = (SWalHead *)item;
      }
S
slguan 已提交
213

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
214 215
      int32_t code = vnodeProcessWrite(pVnode, type, pHead, item);
      if (pWrite) pWrite->rpcMsg.code = code;
S
slguan 已提交
216
    }
S
slguan 已提交
217

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218
    walFsync(vnodeGetWal(pVnode));
S
slguan 已提交
219 220

    // browse all items, and process them one by one
J
Jeff Tao 已提交
221
    taosResetQitems(pWorker->qall);
S
slguan 已提交
222
    for (int32_t i = 0; i < numOfMsgs; ++i) {
J
Jeff Tao 已提交
223
      taosGetQitem(pWorker->qall, &type, &item);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
224 225 226
      if (type == TAOS_QTYPE_RPC) {
        pWrite = (SWriteMsg *)item;
        dnodeSendRpcWriteRsp(pVnode, item, pWrite->rpcMsg.code); 
S
slguan 已提交
227
      } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
228 229
        taosFreeQitem(item);
        vnodeRelease(pVnode);
S
slguan 已提交
230 231 232 233 234
      }
    }
  }

  return NULL;
S
slguan 已提交
235 236
}

237
UNUSED_FUNC
S
slguan 已提交
238
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
S
slguan 已提交
239
  int32_t num = taosGetQueueNumber(pWorker->qset);
S
slguan 已提交
240 241

  if (num > 0) {
242
     usleep(30000);
S
slguan 已提交
243 244
     sched_yield(); 
  } else {
J
Jeff Tao 已提交
245
     taosFreeQall(pWorker->qall);
S
slguan 已提交
246 247
     taosCloseQset(pWorker->qset);
     pWorker->qset = NULL;
J
Jeff Tao 已提交
248
     dTrace("write worker:%d is released", pWorker->workerId);
S
slguan 已提交
249 250 251
     pthread_exit(NULL);
  }
}