dnodeMWrite.c 6.4 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
21
#include "ttimer.h"
S
Shengliang Guan 已提交
22 23 24 25 26 27
#include "tqueue.h"
#include "twal.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
28
#include "dnodeMgmt.h"
S
Shengliang Guan 已提交
29 30 31 32 33 34 35 36
#include "dnodeMWrite.h"

typedef struct {
  pthread_t thread;
  int32_t   workerId;
} SMWriteWorker;

typedef struct {
S
TD-1657  
Shengliang Guan 已提交
37 38
  int32_t curNum;
  int32_t maxNum;
S
Shengliang Guan 已提交
39 40 41 42 43 44
  SMWriteWorker *writeWorker;
} SMWriteWorkerPool;

static SMWriteWorkerPool tsMWritePool;
static taos_qset         tsMWriteQset;
static taos_queue        tsMWriteQueue;
45
extern void *            tsDnodeTmr;
S
Shengliang Guan 已提交
46 47 48 49 50

static void *dnodeProcessMnodeWriteQueue(void *param);

int32_t dnodeInitMnodeWrite() {
  tsMWriteQset = taosOpenQset();
S
TD-1657  
Shengliang Guan 已提交
51 52 53 54

  tsMWritePool.maxNum = 1;
  tsMWritePool.curNum = 0;
  tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.maxNum);
S
Shengliang Guan 已提交
55 56

  if (tsMWritePool.writeWorker == NULL) return -1;
S
TD-1657  
Shengliang Guan 已提交
57
  for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
S
Shengliang Guan 已提交
58 59
    SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
    pWorker->workerId = i;
S
TD-1657  
Shengliang Guan 已提交
60
    dDebug("dnode mwrite worker:%d is created", i);
S
Shengliang Guan 已提交
61 62
  }

S
TD-1657  
Shengliang Guan 已提交
63
  dDebug("dnode mwrite is opened, workers:%d qset:%p", tsMWritePool.maxNum, tsMWriteQset);
S
Shengliang Guan 已提交
64 65 66 67
  return 0;
}

void dnodeCleanupMnodeWrite() {
S
TD-1657  
Shengliang Guan 已提交
68
  for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
S
Shengliang Guan 已提交
69 70 71 72
    SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
    if (pWorker->thread) {
      taosQsetThreadResume(tsMWriteQset);
    }
S
TD-1657  
Shengliang Guan 已提交
73
    dDebug("dnode mwrite worker:%d is closed", i);
S
Shengliang Guan 已提交
74 75
  }

S
TD-1657  
Shengliang Guan 已提交
76
  for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
S
Shengliang Guan 已提交
77
    SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
S
TD-1657  
Shengliang Guan 已提交
78
    dDebug("dnode mwrite worker:%d start to join", i);
S
Shengliang Guan 已提交
79 80 81
    if (pWorker->thread) {
      pthread_join(pWorker->thread, NULL);
    }
S
TD-1657  
Shengliang Guan 已提交
82
    dDebug("dnode mwrite worker:%d join success", i);
S
Shengliang Guan 已提交
83 84
  }

S
TD-1657  
Shengliang Guan 已提交
85 86
  dDebug("dnode mwrite is closed, qset:%p", tsMWriteQset);

S
Shengliang Guan 已提交
87
  taosCloseQset(tsMWriteQset);
S
TD-1657  
Shengliang Guan 已提交
88
  tsMWriteQset = NULL;
S
Shengliang Guan 已提交
89
  taosTFree(tsMWritePool.writeWorker);
S
Shengliang Guan 已提交
90 91
}

92
int32_t dnodeAllocateMnodeWqueue() {
S
Shengliang Guan 已提交
93
  tsMWriteQueue = taosOpenQueue();
94
  if (tsMWriteQueue == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
95 96 97

  taosAddIntoQset(tsMWriteQset, tsMWriteQueue, NULL);

S
TD-1657  
Shengliang Guan 已提交
98
  for (int32_t i = tsMWritePool.curNum; i < tsMWritePool.maxNum; ++i) {
S
Shengliang Guan 已提交
99 100 101 102 103 104 105 106 107 108 109 110
    SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
    pWorker->workerId = i;

    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

    if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMnodeWriteQueue, pWorker) != 0) {
      dError("failed to create thread to process mwrite queue, reason:%s", strerror(errno));
    }

    pthread_attr_destroy(&thAttr);
S
TD-1657  
Shengliang Guan 已提交
111 112
    tsMWritePool.curNum = i + 1;
    dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.maxNum);
S
Shengliang Guan 已提交
113 114
  }

115
  dDebug("dnode mwrite queue:%p is allocated", tsMWriteQueue);
S
Shengliang Guan 已提交
116 117 118
  return TSDB_CODE_SUCCESS;
}

119
void dnodeFreeMnodeWqueue() {
S
TD-1657  
Shengliang Guan 已提交
120
  dDebug("dnode mwrite queue:%p is freed", tsMWriteQueue);
S
Shengliang Guan 已提交
121 122 123 124 125 126
  taosCloseQueue(tsMWriteQueue);
  tsMWriteQueue = NULL;
}

void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) {
  if (!mnodeIsRunning() || tsMWriteQueue == NULL) {
127
    dnodeSendRedirectMsg(pMsg, true);
128
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
129 130 131 132
    return;
  }

  SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg));
133
  mnodeCreateMsg(pWrite, pMsg);
134

S
TD-1657  
Shengliang Guan 已提交
135 136
  dDebug("app:%p:%p, msg:%s is put into mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite,
         taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
S
Shengliang Guan 已提交
137 138 139
  taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}

140
static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
S
TD-1657  
Shengliang Guan 已提交
141 142 143
  dDebug("app:%p:%p, msg:%s is freed from mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite,
         taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);

144 145 146 147
  mnodeCleanupMsg(pWrite);
  taosFreeQitem(pWrite);
}

148 149
void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) {
  SMnodeMsg *pWrite = pMsg;
150
  if (pWrite == NULL) return;
151 152
  if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
  if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) {
153 154 155
    dnodeReprocessMnodeWriteMsg(pWrite);
    return;
  }
S
Shengliang Guan 已提交
156 157

  SRpcMsg rpcRsp = {
158
    .handle  = pWrite->rpcMsg.handle,
159 160 161
    .pCont   = pWrite->rpcRsp.rsp,
    .contLen = pWrite->rpcRsp.len,
    .code    = code,
S
Shengliang Guan 已提交
162 163 164
  };

  rpcSendResponse(&rpcRsp);
165
  dnodeFreeMnodeWriteMsg(pWrite);
S
Shengliang Guan 已提交
166 167 168
}

static void *dnodeProcessMnodeWriteQueue(void *param) {
169
  SMnodeMsg *pWrite;
S
Shengliang Guan 已提交
170 171 172 173
  int32_t    type;
  void *     unUsed;
  
  while (1) {
174
    if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) {
S
TD-1673  
Shengliang Guan 已提交
175
      dDebug("qset:%p, mnode write got no message from qset, exiting", tsMWriteQset);
S
Shengliang Guan 已提交
176 177 178
      break;
    }

179
    dDebug("app:%p:%p, msg:%s will be processed in mwrite queue", pWrite->rpcMsg.ahandle, pWrite,
180 181 182 183
           taosMsg[pWrite->rpcMsg.msgType]);

    int32_t code = mnodeProcessWrite(pWrite);
    dnodeSendRpcMnodeWriteRsp(pWrite, code);
S
Shengliang Guan 已提交
184 185 186 187
  }

  return NULL;
}
188 189 190 191 192

void dnodeReprocessMnodeWriteMsg(void *pMsg) {
  SMnodeMsg *pWrite = pMsg;

  if (!mnodeIsRunning() || tsMWriteQueue == NULL) {
193
    dDebug("app:%p:%p, msg:%s is redirected for mnode not running, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
194 195
           taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);

196
    dnodeSendRedirectMsg(pMsg, true);
197
    dnodeFreeMnodeWriteMsg(pWrite);
198
  } else {
S
TD-1657  
Shengliang Guan 已提交
199 200
    dDebug("app:%p:%p, msg:%s is reput into mwrite queue:%p, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
           taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue, pWrite->retry);
201

202 203 204 205 206 207 208 209 210 211 212 213
    taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
  }
}

static void dnodeDoDelayReprocessMnodeWriteMsg(void *param, void *tmrId) {
  dnodeReprocessMnodeWriteMsg(param);
}

void dnodeDelayReprocessMnodeWriteMsg(void *pMsg) {
  SMnodeMsg *mnodeMsg = pMsg;
  void *unUsed = NULL;
  taosTmrReset(dnodeDoDelayReprocessMnodeWriteMsg, 300, mnodeMsg, tsDnodeTmr, &unUsed);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
214
}