dnodeRead.c 6.7 KB
Newer Older
S
#1177  
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
slguan 已提交
17 18
#include "os.h"
#include "taoserror.h"
S
slguan 已提交
19
#include "taosmsg.h"
S
slguan 已提交
20
#include "tlog.h"
S
slguan 已提交
21
#include "tqueue.h"
S
slguan 已提交
22
#include "trpc.h"
S
slguan 已提交
23
#include "dnodeRead.h"
S
slguan 已提交
24
#include "dnodeMgmt.h"
S
#1177  
slguan 已提交
25

S
slguan 已提交
26 27 28 29 30 31 32 33
typedef struct {
  int32_t  code;
  int32_t  count;
  int32_t  numOfVnodes;
} SRpcContext;

typedef struct {
  void        *pCont;
S
slguan 已提交
34
  int32_t      contLen;
S
slguan 已提交
35 36 37 38 39 40 41 42 43 44
  SRpcMsg      rpcMsg;
  void        *pVnode;
  SRpcContext *pRpcContext;  // RPC message context
} SReadMsg;

static void *dnodeProcessReadQueue(void *param);
static void  dnodeProcessReadResult(SReadMsg *pRead);
static void  dnodeHandleIdleReadWorker();
static void  dnodeProcessQueryMsg(SReadMsg *pMsg);
static void  dnodeProcessRetrieveMsg(SReadMsg *pMsg);
S
slguan 已提交
45
static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SReadMsg *pNode);
S
slguan 已提交
46 47 48

// module global variable
static taos_qset readQset;
S
slguan 已提交
49 50 51
static int32_t   threads;    // number of query threads
static int32_t   maxThreads;
static int32_t   minThreads;
S
slguan 已提交
52

S
slguan 已提交
53 54
int32_t dnodeInitRead() {
  dnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY]    = dnodeProcessQueryMsg;
S
slguan 已提交
55 56 57 58 59 60 61 62
  dnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeProcessRetrieveMsg;

  readQset = taosOpenQset();

  minThreads = 3;
  maxThreads = tsNumOfCores*tsNumOfThreadsPerCore;
  if (maxThreads <= minThreads*2) maxThreads = 2*minThreads;

S
slguan 已提交
63
  dPrint("dnode read is opened");
S
slguan 已提交
64
  return 0;
S
slguan 已提交
65
}
S
slguan 已提交
66

S
slguan 已提交
67 68
void dnodeCleanupRead() {
  taosCloseQset(readQset);
S
slguan 已提交
69
  dPrint("dnode read is closed");
S
slguan 已提交
70
}
S
slguan 已提交
71

S
slguan 已提交
72
void dnodeRead(SRpcMsg *pMsg) {
S
slguan 已提交
73
  int32_t     queuedMsgNum = 0;
S
slguan 已提交
74 75
  int32_t     leftLen      = pMsg->contLen;
  char        *pCont       = (char *) pMsg->pCont;
S
slguan 已提交
76
  SRpcContext *pRpcContext = NULL;
S
slguan 已提交
77

S
slguan 已提交
78 79 80 81 82 83 84 85 86
//  SMsgDesc *pDesc = pCont;
//  pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
//  pCont += sizeof(SMsgDesc);
//  if (pDesc->numOfVnodes > 1) {
//    pRpcContext = calloc(sizeof(SRpcContext), 1);
//    pRpcContext->numOfVnodes = pDesc->numOfVnodes;
//  }
  if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
    queuedMsgNum = 0;
S
slguan 已提交
87
  }
S
slguan 已提交
88

S
slguan 已提交
89
  while (leftLen > 0) {
S
slguan 已提交
90 91 92
    SMsgHead *pHead = (SMsgHead *) pCont;
    pHead->vgId    = 1; //htonl(pHead->vgId);
    pHead->contLen = pMsg->contLen; //htonl(pHead->contLen);
S
slguan 已提交
93

S
slguan 已提交
94
    void *pVnode = dnodeGetVnode(pHead->vgId);
S
slguan 已提交
95
    if (pVnode == NULL) {
S
slguan 已提交
96 97
      leftLen -= pHead->contLen;
      pCont -= pHead->contLen;
S
slguan 已提交
98 99
      continue;
    }
S
slguan 已提交
100

S
slguan 已提交
101 102
    // put message into queue
    SReadMsg readMsg;
S
slguan 已提交
103 104
    readMsg.rpcMsg      = *pMsg;
    readMsg.pCont       = pCont;
S
slguan 已提交
105
    readMsg.contLen     = pHead->contLen;
S
slguan 已提交
106 107
    readMsg.pRpcContext = pRpcContext;
    readMsg.pVnode      = pVnode;
S
slguan 已提交
108 109 110

    taos_queue queue = dnodeGetVnodeRworker(pVnode);
    taosWriteQitem(queue, &readMsg);
S
slguan 已提交
111 112

    // next vnode
S
slguan 已提交
113 114 115
    leftLen -= pHead->contLen;
    pCont -= pHead->contLen;
    queuedMsgNum++;
S
slguan 已提交
116 117

    dnodeReleaseVnode(pVnode);
S
slguan 已提交
118
  }
S
slguan 已提交
119 120 121 122 123 124 125 126 127 128 129

  if (queuedMsgNum == 0) {
    SRpcMsg rpcRsp = {
        .handle  = pMsg->handle,
        .pCont   = NULL,
        .contLen = 0,
        .code    = TSDB_CODE_INVALID_VGROUP_ID,
        .msgType = 0
    };
    rpcSendResponse(&rpcRsp);
  }
S
slguan 已提交
130 131
}

S
slguan 已提交
132 133
void *dnodeAllocateReadWorker() {
  taos_queue *queue = taosOpenQueue(sizeof(SReadMsg));
S
slguan 已提交
134
  if (queue == NULL) return NULL;
S
slguan 已提交
135 136

  taosAddIntoQset(readQset, queue);
S
slguan 已提交
137

S
slguan 已提交
138 139 140 141 142 143
  // spawn a thread to process queue
  if (threads < maxThreads) {
    pthread_t thread;
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
S
slguan 已提交
144

S
slguan 已提交
145 146 147 148 149 150
    if (pthread_create(&thread, &thAttr, dnodeProcessReadQueue, readQset) != 0) {
      dError("failed to create thread to process read queue, reason:%s", strerror(errno));
    }
  }

  return queue;
S
slguan 已提交
151
}
S
slguan 已提交
152

S
slguan 已提交
153 154 155 156
void dnodeFreeReadWorker(void *rqueue) {
  taosCloseQueue(rqueue);

  // dynamically adjust the number of threads
S
slguan 已提交
157 158
}

S
slguan 已提交
159 160 161 162 163 164 165
static void *dnodeProcessReadQueue(void *param) {
  taos_qset  qset = (taos_qset)param;
  SReadMsg   readMsg;

  while (1) {
    if (taosReadQitemFromQset(qset, &readMsg) <= 0) {
      dnodeHandleIdleReadWorker();
S
slguan 已提交
166
      continue;
S
slguan 已提交
167 168 169 170 171 172
    }

    terrno = 0;
    if (dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) {
      (*dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) (&readMsg);
    } else {
S
slguan 已提交
173
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
slguan 已提交
174
    }
S
slguan 已提交
175

S
slguan 已提交
176 177 178 179
    dnodeProcessReadResult(&readMsg);
  }

  return NULL;
S
dnode  
slguan 已提交
180
}
S
slguan 已提交
181

S
slguan 已提交
182
static void dnodeHandleIdleReadWorker() {
S
slguan 已提交
183
  int32_t num = taosGetQueueNumber(readQset);
S
slguan 已提交
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201

  if (num == 0 || (num <= minThreads && threads > minThreads)) {
    threads--;
    pthread_exit(NULL);
  } else {
    usleep(100);
    sched_yield();
  }
}

static void dnodeProcessReadResult(SReadMsg *pRead) {
  SRpcContext *pRpcContext = pRead->pRpcContext;
  int32_t      code = 0;

  dnodeReleaseVnode(pRead->pVnode);

  if (pRpcContext) {
    if (terrno) {
S
slguan 已提交
202
      if (pRpcContext->code == 0) pRpcContext->code = terrno;
S
slguan 已提交
203 204
    }

S
slguan 已提交
205
    int32_t count = atomic_add_fetch_32(&pRpcContext->count, 1);
S
slguan 已提交
206 207 208 209
    if (count < pRpcContext->numOfVnodes) {
      // not over yet, multiple vnodes
      return;
    }
S
slguan 已提交
210

S
slguan 已提交
211 212 213 214 215 216 217 218
    // over, result can be merged now
    code = pRpcContext->code;
  } else {
    code = terrno;
  }

  SRpcMsg rsp;
  rsp.handle = pRead->rpcMsg.handle;
S
slguan 已提交
219 220
  rsp.code   = code;
  rsp.pCont  = NULL;
S
slguan 已提交
221 222 223 224 225
  rpcSendResponse(&rsp);
  rpcFreeCont(pRead->rpcMsg.pCont);  // free the received message
}

static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
S
slguan 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
  void *pQInfo = (void*)100;
  dTrace("query msg is disposed, qInfo:%p", pQInfo);

  SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
  pRsp->code    = 0;
  pRsp->qhandle = htobe64((uint64_t) (pQInfo));

  SRpcMsg rpcRsp = {
      .handle = pMsg->rpcMsg.handle,
      .pCont = pRsp,
      .contLen = sizeof(SQueryTableRsp),
      .code = 0,
      .msgType = 0
  };
  rpcSendResponse(&rpcRsp);
S
slguan 已提交
241 242 243
}

static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
S
slguan 已提交
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
  SRetrieveTableMsg *pRetrieve = pMsg->pCont;
  void *pQInfo = htobe64(pRetrieve->qhandle);

  dTrace("retrieve msg is disposed, qInfo:%p", pQInfo);

  assert(pQInfo != NULL);
  int32_t contLen = 100;
  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *) rpcMallocCont(contLen);
  pRsp->numOfRows = 0;
  pRsp->precision = 0;
  pRsp->offset    = 0;
  pRsp->useconds  = 0;

  SRpcMsg rpcRsp = {
      .handle = pMsg->rpcMsg.handle,
      .pCont = pRsp,
      .contLen = contLen,
      .code = 0,
      .msgType = 0
  };
  rpcSendResponse(&rpcRsp);
S
slguan 已提交
265
}