schFlowCtrl.c 8.7 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "schedulerInt.h"
#include "tmsg.h"
#include "query.h"
#include "catalog.h"
#include "tref.h"

D
dapan1121 已提交
22 23
void schFreeFlowCtrl(SSchJob *pJob) {
  if (NULL == pJob->flowCtrl) {
D
dapan1121 已提交
24 25 26 27
    return;
  }

  SSchFlowControl *ctrl = NULL;
D
dapan1121 已提交
28
  void *pIter = taosHashIterate(pJob->flowCtrl, NULL);
D
dapan1121 已提交
29 30 31 32 33 34 35
  while (pIter) {
    ctrl = (SSchFlowControl *)pIter;

    if (ctrl->taskList) {
      taosArrayDestroy(ctrl->taskList);
    }
    
D
dapan1121 已提交
36
    pIter = taosHashIterate(pJob->flowCtrl, pIter);
D
dapan1121 已提交
37 38
  }

D
dapan1121 已提交
39 40
  taosHashCleanup(pJob->flowCtrl);
  pJob->flowCtrl = NULL;
D
dapan1121 已提交
41 42
}

D
dapan1121 已提交
43
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
D
dapan1121 已提交
44
  if (!SCH_IS_QUERY_JOB(pJob)) {
D
dapan 已提交
45
    SCH_JOB_DLOG("job no need flow ctrl, queryJob:%d", SCH_IS_QUERY_JOB(pJob));
D
dapan1121 已提交
46 47 48
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
49
  int32_t sum = 0;
D
dapan1121 已提交
50 51 52
  int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks);
  for (int32_t i = 0; i < taskNum; ++i) {
    SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i);
D
dapan1121 已提交
53 54 55 56 57

    sum += pTask->plan->execNodeStat.tableNum;
  }

  if (sum < schMgmt.cfg.maxNodeTableNum) {
D
dapan 已提交
58
    SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%d", sum);
D
dapan1121 已提交
59 60 61
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
62 63 64
  pJob->flowCtrl = taosHashInit(pJob->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->flowCtrl) {
    SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pJob->taskNum);
D
dapan1121 已提交
65 66 67 68 69
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SCH_SET_JOB_NEED_FLOW_CTRL(pJob);

D
dapan 已提交
70 71
  SCH_JOB_DLOG("job NEED flow ctrl, totalTableNum:%d", sum);

D
dapan1121 已提交
72 73 74 75 76 77 78 79 80
  return TSDB_CODE_SUCCESS;
}

int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
  SSchLevel *pLevel = pTask->level;
  SSchFlowControl *ctrl = NULL;
  int32_t code = 0;
  SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
  
D
dapan1121 已提交
81
  ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
D
dapan1121 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95
  if (NULL == ctrl) {
    SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
  
  SCH_LOCK(SCH_WRITE, &ctrl->lock);
  if (ctrl->execTaskNum <= 0) {
    SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

  --ctrl->execTaskNum;
  ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum;

D
dapan 已提交
96 97 98
  SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", 
     ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);

D
dapan1121 已提交
99 100 101 102 103 104 105 106 107 108 109
_return:

  SCH_UNLOCK(SCH_WRITE, &ctrl->lock);

  SCH_RET(code);
}

int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
  SSchLevel *pLevel = pTask->level;
  int32_t code = 0;
  SSchFlowControl *ctrl = NULL;
D
dapan 已提交
110
  SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
D
dapan1121 已提交
111 112
  
  do {
D
dapan1121 已提交
113
    ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
D
dapan1121 已提交
114 115 116
    if (NULL == ctrl) {
      SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1};

D
dapan1121 已提交
117
      code = taosHashPut(pJob->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl));
D
dapan1121 已提交
118 119 120 121 122 123 124 125 126
      if (code) {
        if (HASH_NODE_EXIST(code)) {
          continue;
        }
        
        SCH_TASK_ELOG("taosHashPut flowCtrl failed, size:%d", (int32_t)sizeof(nctrl));
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan 已提交
127 128 129
      SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", 
         ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum);

D
dapan1121 已提交
130 131 132 133 134 135 136 137 138 139 140 141 142 143
      *enough = true;
      return TSDB_CODE_SUCCESS;
    }

    SCH_LOCK(SCH_WRITE, &ctrl->lock);
    
    if (0 == ctrl->execTaskNum) {
      ctrl->tableNumSum = pTask->plan->execNodeStat.tableNum;
      ++ctrl->execTaskNum;
      
      *enough = true;
      break;
    }
    
D
dapan 已提交
144
    int32_t sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum;
D
dapan1121 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
    
    if (sum <= schMgmt.cfg.maxNodeTableNum) {
      ctrl->tableNumSum = sum;
      ++ctrl->execTaskNum;
      
      *enough = true;
      break;
    }

    if (NULL == ctrl->taskList) {
      ctrl->taskList = taosArrayInit(pLevel->taskNum, POINTER_BYTES);
      if (NULL == ctrl->taskList) {
        SCH_TASK_ELOG("taosArrayInit taskList failed, size:%d", (int32_t)pLevel->taskNum);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
    }

    if (NULL == taosArrayPush(ctrl->taskList, &pTask)) {
      SCH_TASK_ELOG("taosArrayPush to taskList failed, size:%d", (int32_t)taosArrayGetSize(ctrl->taskList));
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    *enough = false;
D
dapan1121 已提交
168
    ctrl->sorted = false;
D
dapan1121 已提交
169 170 171 172 173 174
    
    break;
  } while (true);

_return:

D
dapan 已提交
175 176 177
  SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", 
     ((*enough)?"":"NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);

D
dapan1121 已提交
178 179 180 181 182
  SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
  
  SCH_RET(code);
}

D
dapan1121 已提交
183 184 185 186 187 188 189 190 191 192 193 194
int32_t schTaskTableNumCompare(const void* key1, const void* key2) {
  SSchTask *pTask1 = *(SSchTask **)key1;
  SSchTask *pTask2 = *(SSchTask **)key2;
  
  if (pTask1->plan->execNodeStat.tableNum < pTask2->plan->execNodeStat.tableNum) {
    return 1;
  } else if (pTask1->plan->execNodeStat.tableNum > pTask2->plan->execNodeStat.tableNum) {
    return -1;
  } else {
    return 0;
  }
}
D
dapan1121 已提交
195

D
dapan1121 已提交
196 197 198 199 200

int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
  SCH_LOCK(SCH_WRITE, &ctrl->lock);
  
  if (NULL == ctrl->taskList || taosArrayGetSize(ctrl->taskList) <= 0) {
D
dapan1121 已提交
201
    SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
D
dapan1121 已提交
202 203
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
204

D
dapan1121 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
  int32_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum;
  int32_t taskNum = taosArrayGetSize(ctrl->taskList);
  int32_t code = 0;
  SSchTask *pTask = NULL;
  
  if (taskNum > 1 && !ctrl->sorted) {
    taosArraySort(ctrl->taskList, schTaskTableNumCompare); // desc order
  }

  for (int32_t i = 0; i < taskNum; ++i) {
    pTask = *(SSchTask **)taosArrayGet(ctrl->taskList, i);
    SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);

    if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) {
      SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", 
         ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
D
dapan1121 已提交
221 222 223

      continue;
    }
D
dapan1121 已提交
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
    
    ctrl->tableNumSum += pTask->plan->execNodeStat.tableNum;
    ++ctrl->execTaskNum;

    taosArrayRemove(ctrl->taskList, i);
    
    SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", 
       ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
    
    SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
    
    remainNum -= pTask->plan->execNodeStat.tableNum;
    if (remainNum <= 0) {
      SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d", 
         ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum);
    
      break;
    }
D
dapan1121 已提交
242

D
dapan1121 已提交
243 244 245 246 247 248 249 250 251
    if (i < (taskNum - 1)) {
      SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList);
      if (remainNum < pLastTask->plan->execNodeStat.tableNum) {
        SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d", 
           ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum);
      
        break;
      }
    }
D
dapan1121 已提交
252

D
dapan1121 已提交
253 254 255 256 257 258 259 260 261 262 263 264 265
    --i;
    --taskNum;
  }
  
_return:

  SCH_UNLOCK(SCH_WRITE, &ctrl->lock);

  if (code) {
    code = schProcessOnTaskFailure(pJob, pTask, code);
  }
  
  SCH_RET(code);
D
dapan1121 已提交
266 267 268 269 270 271 272 273 274 275 276 277
}


int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
  if (!SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    return TSDB_CODE_SUCCESS;
  }

  SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));

  SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
  
D
dapan1121 已提交
278
  SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
D
dapan1121 已提交
279 280 281 282 283
  if (NULL == ctrl) {
    SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
  
284 285
  int32_t code = schLaunchTasksInFlowCtrlListImpl(pJob, ctrl);;
  SCH_ERR_RET(code);
D
dapan1121 已提交
286
  
287
  return code; // to avoid compiler error
D
dapan1121 已提交
288 289 290
}