schFlowCtrl.c 8.9 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16 17
#include "catalog.h"
#include "query.h"
D
dapan1121 已提交
18
#include "schInt.h"
D
dapan1121 已提交
19 20 21
#include "tmsg.h"
#include "tref.h"

D
dapan1121 已提交
22 23
void schFreeFlowCtrl(SSchJob *pJob) {
  if (NULL == pJob->flowCtrl) {
D
dapan1121 已提交
24 25 26 27
    return;
  }

  SSchFlowControl *ctrl = NULL;
H
Hongze Cheng 已提交
28
  void            *pIter = taosHashIterate(pJob->flowCtrl, NULL);
D
dapan1121 已提交
29 30 31 32 33 34
  while (pIter) {
    ctrl = (SSchFlowControl *)pIter;

    if (ctrl->taskList) {
      taosArrayDestroy(ctrl->taskList);
    }
H
Hongze Cheng 已提交
35

D
dapan1121 已提交
36
    pIter = taosHashIterate(pJob->flowCtrl, pIter);
D
dapan1121 已提交
37 38
  }

D
dapan1121 已提交
39 40
  taosHashCleanup(pJob->flowCtrl);
  pJob->flowCtrl = NULL;
D
dapan1121 已提交
41 42
}

D
dapan1121 已提交
43
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
D
dapan1121 已提交
44
  if (!SCH_IS_QUERY_JOB(pJob)) {
D
dapan 已提交
45
    SCH_JOB_DLOG("job no need flow ctrl, queryJob:%d", SCH_IS_QUERY_JOB(pJob));
D
dapan1121 已提交
46 47 48
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
49
  int32_t sum = 0;
D
dapan1121 已提交
50 51 52
  int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks);
  for (int32_t i = 0; i < taskNum; ++i) {
    SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i);
D
dapan1121 已提交
53 54 55 56

    sum += pTask->plan->execNodeStat.tableNum;
  }

D
dapan1121 已提交
57
  if (schMgmt.cfg.maxNodeTableNum <= 0 || sum < schMgmt.cfg.maxNodeTableNum) {
D
dapan 已提交
58
    SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%d", sum);
D
dapan1121 已提交
59 60 61
    return TSDB_CODE_SUCCESS;
  }

H
Hongze Cheng 已提交
62 63
  pJob->flowCtrl =
      taosHashInit(pJob->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
64 65
  if (NULL == pJob->flowCtrl) {
    SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pJob->taskNum);
D
dapan1121 已提交
66 67 68 69 70
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SCH_SET_JOB_NEED_FLOW_CTRL(pJob);

D
dapan 已提交
71 72
  SCH_JOB_DLOG("job NEED flow ctrl, totalTableNum:%d", sum);

D
dapan1121 已提交
73 74 75 76
  return TSDB_CODE_SUCCESS;
}

int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
H
Hongze Cheng 已提交
77
  SSchLevel       *pLevel = pTask->level;
D
dapan1121 已提交
78
  SSchFlowControl *ctrl = NULL;
H
Hongze Cheng 已提交
79 80 81
  int32_t          code = 0;
  SEp             *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);

D
dapan1121 已提交
82
  ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
D
dapan1121 已提交
83 84 85 86
  if (NULL == ctrl) {
    SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
H
Hongze Cheng 已提交
87

D
dapan1121 已提交
88 89 90 91 92 93 94 95 96
  SCH_LOCK(SCH_WRITE, &ctrl->lock);
  if (ctrl->execTaskNum <= 0) {
    SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

  --ctrl->execTaskNum;
  ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum;

H
Hongze Cheng 已提交
97 98
  SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
                ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
D
dapan 已提交
99

D
dapan1121 已提交
100 101 102 103 104 105 106 107
_return:

  SCH_UNLOCK(SCH_WRITE, &ctrl->lock);

  SCH_RET(code);
}

int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
H
Hongze Cheng 已提交
108 109
  SSchLevel       *pLevel = pTask->level;
  int32_t          code = 0;
D
dapan1121 已提交
110
  SSchFlowControl *ctrl = NULL;
H
Hongze Cheng 已提交
111 112
  SEp             *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);

D
dapan1121 已提交
113
  do {
D
dapan1121 已提交
114
    ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
D
dapan1121 已提交
115 116 117
    if (NULL == ctrl) {
      SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1};

D
dapan1121 已提交
118
      code = taosHashPut(pJob->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl));
D
dapan1121 已提交
119 120 121 122
      if (code) {
        if (HASH_NODE_EXIST(code)) {
          continue;
        }
H
Hongze Cheng 已提交
123

D
dapan1121 已提交
124 125 126 127
        SCH_TASK_ELOG("taosHashPut flowCtrl failed, size:%d", (int32_t)sizeof(nctrl));
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

H
Hongze Cheng 已提交
128 129
      SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
                    ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum);
D
dapan 已提交
130

D
dapan1121 已提交
131 132 133 134 135
      *enough = true;
      return TSDB_CODE_SUCCESS;
    }

    SCH_LOCK(SCH_WRITE, &ctrl->lock);
H
Hongze Cheng 已提交
136

D
dapan1121 已提交
137 138 139
    if (0 == ctrl->execTaskNum) {
      ctrl->tableNumSum = pTask->plan->execNodeStat.tableNum;
      ++ctrl->execTaskNum;
H
Hongze Cheng 已提交
140

D
dapan1121 已提交
141 142 143
      *enough = true;
      break;
    }
H
Hongze Cheng 已提交
144

D
dapan 已提交
145
    int32_t sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum;
H
Hongze Cheng 已提交
146

D
dapan1121 已提交
147 148 149
    if (sum <= schMgmt.cfg.maxNodeTableNum) {
      ctrl->tableNumSum = sum;
      ++ctrl->execTaskNum;
H
Hongze Cheng 已提交
150

D
dapan1121 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
      *enough = true;
      break;
    }

    if (NULL == ctrl->taskList) {
      ctrl->taskList = taosArrayInit(pLevel->taskNum, POINTER_BYTES);
      if (NULL == ctrl->taskList) {
        SCH_TASK_ELOG("taosArrayInit taskList failed, size:%d", (int32_t)pLevel->taskNum);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
    }

    if (NULL == taosArrayPush(ctrl->taskList, &pTask)) {
      SCH_TASK_ELOG("taosArrayPush to taskList failed, size:%d", (int32_t)taosArrayGetSize(ctrl->taskList));
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    *enough = false;
D
dapan1121 已提交
169
    ctrl->sorted = false;
H
Hongze Cheng 已提交
170

D
dapan1121 已提交
171 172 173 174 175
    break;
  } while (true);

_return:

H
Hongze Cheng 已提交
176 177 178
  SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
                ((*enough) ? "" : "NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum,
                ctrl->execTaskNum);
D
dapan 已提交
179

D
dapan1121 已提交
180
  SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
H
Hongze Cheng 已提交
181

D
dapan1121 已提交
182 183 184
  SCH_RET(code);
}

H
Hongze Cheng 已提交
185
int32_t schTaskTableNumCompare(const void *key1, const void *key2) {
D
dapan1121 已提交
186 187
  SSchTask *pTask1 = *(SSchTask **)key1;
  SSchTask *pTask2 = *(SSchTask **)key2;
H
Hongze Cheng 已提交
188

D
dapan1121 已提交
189 190 191 192 193 194 195 196
  if (pTask1->plan->execNodeStat.tableNum < pTask2->plan->execNodeStat.tableNum) {
    return 1;
  } else if (pTask1->plan->execNodeStat.tableNum > pTask2->plan->execNodeStat.tableNum) {
    return -1;
  } else {
    return 0;
  }
}
D
dapan1121 已提交
197

D
dapan1121 已提交
198 199
int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
  SCH_LOCK(SCH_WRITE, &ctrl->lock);
H
Hongze Cheng 已提交
200

D
dapan1121 已提交
201
  if (NULL == ctrl->taskList || taosArrayGetSize(ctrl->taskList) <= 0) {
D
dapan1121 已提交
202
    SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
D
dapan1121 已提交
203 204
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
205

H
Hongze Cheng 已提交
206 207 208
  int32_t   remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum;
  int32_t   taskNum = taosArrayGetSize(ctrl->taskList);
  int32_t   code = 0;
D
dapan1121 已提交
209
  SSchTask *pTask = NULL;
H
Hongze Cheng 已提交
210

D
dapan1121 已提交
211
  if (taskNum > 1 && !ctrl->sorted) {
H
Hongze Cheng 已提交
212
    taosArraySort(ctrl->taskList, schTaskTableNumCompare);  // desc order
D
dapan1121 已提交
213 214 215 216 217 218 219
  }

  for (int32_t i = 0; i < taskNum; ++i) {
    pTask = *(SSchTask **)taosArrayGet(ctrl->taskList, i);
    SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);

    if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) {
H
Hongze Cheng 已提交
220 221
      SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
                    ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
D
dapan1121 已提交
222 223 224

      continue;
    }
H
Hongze Cheng 已提交
225

D
dapan1121 已提交
226 227 228 229
    ctrl->tableNumSum += pTask->plan->execNodeStat.tableNum;
    ++ctrl->execTaskNum;

    taosArrayRemove(ctrl->taskList, i);
H
Hongze Cheng 已提交
230 231 232 233

    SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
                  ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);

D
dapan1121 已提交
234
    SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
H
Hongze Cheng 已提交
235

D
dapan1121 已提交
236 237
    remainNum -= pTask->plan->execNodeStat.tableNum;
    if (remainNum <= 0) {
H
Hongze Cheng 已提交
238 239 240
      SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, ep->port,
                    ctrl->tableNumSum, ctrl->execTaskNum);

D
dapan1121 已提交
241 242
      break;
    }
D
dapan1121 已提交
243

D
dapan1121 已提交
244 245 246
    if (i < (taskNum - 1)) {
      SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList);
      if (remainNum < pLastTask->plan->execNodeStat.tableNum) {
H
Hongze Cheng 已提交
247 248 249
        SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d",
                      ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum);

D
dapan1121 已提交
250 251 252
        break;
      }
    }
D
dapan1121 已提交
253

D
dapan1121 已提交
254 255 256
    --i;
    --taskNum;
  }
H
Hongze Cheng 已提交
257

D
dapan1121 已提交
258 259 260 261 262 263 264
_return:

  SCH_UNLOCK(SCH_WRITE, &ctrl->lock);

  if (code) {
    code = schProcessOnTaskFailure(pJob, pTask, code);
  }
H
Hongze Cheng 已提交
265

D
dapan1121 已提交
266
  SCH_RET(code);
D
dapan1121 已提交
267 268 269 270 271 272 273 274 275 276
}

int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
  if (!SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    return TSDB_CODE_SUCCESS;
  }

  SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));

  SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
H
Hongze Cheng 已提交
277

D
dapan1121 已提交
278
  SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
D
dapan1121 已提交
279 280 281 282 283
  if (NULL == ctrl) {
    SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

H
Hongze Cheng 已提交
284 285 286
  int32_t code = schLaunchTasksInFlowCtrlListImpl(pJob, ctrl);
  ;
  SCH_ERR_RET(code);
D
dapan1121 已提交
287

H
Hongze Cheng 已提交
288 289
  return code;  // to avoid compiler error
}