executorMain.c 6.5 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include <vnode.h>
H
Haojun Liao 已提交
17
#include "dataSinkMgt.h"
S
Shengliang Guan 已提交
18
#include "texception.h"
19 20
#include "os.h"
#include "tarray.h"
H
Haojun Liao 已提交
21 22 23 24 25 26 27 28 29 30 31 32
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"

#include "thash.h"
#include "executorimpl.h"
#include "executor.h"
#include "tlosertree.h"
#include "ttypes.h"
#include "query.h"

typedef struct STaskMgmt {
wafwerar's avatar
wafwerar 已提交
33
  TdThreadMutex lock;
H
Haojun Liao 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
  SCacheObj      *qinfoPool;      // query handle pool
  int32_t         vgId;
  bool            closed;
} STaskMgmt;

static void taskMgmtKillTaskFn(void* handle, void* param1) {
  void** fp = (void**)handle;
  qKillTask(*fp);
}

static void freeqinfoFn(void *qhandle) {
  void** handle = qhandle;
  if (handle == NULL || *handle == NULL) {
    return;
  }

  qKillTask(*handle);
  qDestroyTask(*handle);
}

54 55
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
    qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
56
  assert(readHandle != NULL && pSubplan != NULL);
H
Haojun Liao 已提交
57
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
H
Haojun Liao 已提交
58

59
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
H
Haojun Liao 已提交
60 61 62
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
63

D
dapan1121 已提交
64
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
65
  code = dsDataSinkMgtInit(&cfg);
H
Haojun Liao 已提交
66
  if (code != TSDB_CODE_SUCCESS) {
67
    goto _error;
H
Haojun Liao 已提交
68
  }
L
Liu Jicong 已提交
69 70 71
  if (handle) {
    code = dsCreateDataSinker(pSubplan->pDataSink, handle);
  }
72

H
Haojun Liao 已提交
73
  _error:
H
Haojun Liao 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
  if(pQInfo->sql) {
    int ms = 0;
    char* pcnt = strstr(pQInfo->sql, " count(*)");
    if(pcnt) return 0;
    
    char* pos = strstr(pQInfo->sql, " t_");
    if(pos){
      pos += 3;
      ms = atoi(pos);
      while(*pos >= '0' && *pos <= '9'){
        pos ++;
      }
      char unit_char = *pos;
      if(unit_char == 'h'){
        ms *= 3600*1000;
      } else if(unit_char == 'm'){
        ms *= 60*1000;
      } else if(unit_char == 's'){
        ms *= 1000;
      }
    }
    if(ms == 0) return 0;
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
    
    if(ms < 1000) {
      taosMsleep(ms);
    } else {
      int used_ms = 0;
      while(used_ms < ms) {
        taosMsleep(1000);
        used_ms += 1000;
112
        if(isTaskKilled(pQInfo)){
H
Haojun Liao 已提交
113 114 115 116 117 118 119 120 121 122
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

D
dapan1121 已提交
123
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
H
Haojun Liao 已提交
124 125
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();
H
Haojun Liao 已提交
126

D
dapan1121 已提交
127
  *pRes = NULL;
H
Haojun Liao 已提交
128
  int64_t curOwner = 0;
129
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
H
Haojun Liao 已提交
130
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
H
Haojun Liao 已提交
131
           (void*)curOwner);
132
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
D
dapan1121 已提交
133
    return pTaskInfo->code;
H
Haojun Liao 已提交
134 135
  }

H
Haojun Liao 已提交
136
  if (pTaskInfo->cost.start == 0) {
137
    pTaskInfo->cost.start = taosGetTimestampMs();
H
Haojun Liao 已提交
138 139
  }

140
  if (isTaskKilled(pTaskInfo)) {
H
Haojun Liao 已提交
141
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
D
dapan1121 已提交
142
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
143 144 145
  }

  // error occurs, record the error code and return to client
146
  int32_t ret = setjmp(pTaskInfo->env);
H
Haojun Liao 已提交
147
  if (ret != TSDB_CODE_SUCCESS) {
148 149
    publishQueryAbortEvent(pTaskInfo, ret);
    pTaskInfo->code = ret;
H
Haojun Liao 已提交
150
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
151
           tstrerror(pTaskInfo->code));
D
dapan1121 已提交
152
    return pTaskInfo->code;
H
Haojun Liao 已提交
153 154
  }

H
Haojun Liao 已提交
155
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
156

157
  publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
H
Haojun Liao 已提交
158

159 160
  int64_t st = taosGetTimestampUs();
  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
H
Haojun Liao 已提交
161
  uint64_t el = (taosGetTimestampUs() - st);
162

H
Haojun Liao 已提交
163 164
  pTaskInfo->cost.elapsedTime += el;

165
  publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
H
Haojun Liao 已提交
166

D
dapan1121 已提交
167 168 169 170
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

H
Haojun Liao 已提交
171 172 173
  int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
  pTaskInfo->totalRows += current;

H
Haojun Liao 已提交
174 175
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
H
Haojun Liao 已提交
176

177
  atomic_store_64(&pTaskInfo->owner, 0);
D
dapan1121 已提交
178
  return pTaskInfo->code;
H
Haojun Liao 已提交
179 180 181
}

int32_t qKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
182
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
183

H
Haojun Liao 已提交
184
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
185 186 187
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
188
  qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
189
  setTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
190 191 192

  // Wait for the query executing thread being stopped/
  // Once the query is stopped, the owner of qHandle will be cleared immediately.
H
Haojun Liao 已提交
193
  while (pTaskInfo->owner != 0) {
H
Haojun Liao 已提交
194 195 196 197 198 199
    taosMsleep(100);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
200
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
201
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
D
dapan1121 已提交
202

H
Haojun Liao 已提交
203
  if (pTaskInfo == NULL) {
D
dapan1121 已提交
204 205 206
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
207
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
208
  setTaskKilled(pTaskInfo);
D
dapan1121 已提交
209 210 211 212

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
213
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
214
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
215

H
Haojun Liao 已提交
216
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
217 218 219
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

220
  return isTaskKilled(pTaskInfo) || Q_STATUS_EQUAL(pTaskInfo->status, TASK_OVER);
H
Haojun Liao 已提交
221 222
}

H
Haojun Liao 已提交
223 224
void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
H
Haojun Liao 已提交
225
  qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows);
H
Haojun Liao 已提交
226

H
Haojun Liao 已提交
227 228
  queryCostStatis(pTaskInfo);   // print the query cost summary
  doDestroyTask(pTaskInfo);
H
Haojun Liao 已提交
229
}
D
dapan1121 已提交
230 231 232 233 234 235 236 237 238

int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
  int32_t capacity = 0;

  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);  
}