executorMain.c 6.5 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include <tsdb.h>
H
Haojun Liao 已提交
17
#include "dataSinkMgt.h"
S
Shengliang Guan 已提交
18
#include "texception.h"
19 20
#include "os.h"
#include "tarray.h"
H
Haojun Liao 已提交
21 22 23 24 25 26 27 28 29 30 31 32
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"

#include "thash.h"
#include "executorimpl.h"
#include "executor.h"
#include "tlosertree.h"
#include "ttypes.h"
#include "query.h"

typedef struct STaskMgmt {
wafwerar's avatar
wafwerar 已提交
33
  TdThreadMutex lock;
H
Haojun Liao 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
  SCacheObj      *qinfoPool;      // query handle pool
  int32_t         vgId;
  bool            closed;
} STaskMgmt;

static void taskMgmtKillTaskFn(void* handle, void* param1) {
  void** fp = (void**)handle;
  qKillTask(*fp);
}

static void freeqinfoFn(void *qhandle) {
  void** handle = qhandle;
  if (handle == NULL || *handle == NULL) {
    return;
  }

  qKillTask(*handle);
  qDestroyTask(*handle);
}

54 55
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
    qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
56
  assert(readHandle != NULL && pSubplan != NULL);
H
Haojun Liao 已提交
57
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
H
Haojun Liao 已提交
58

59
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
H
Haojun Liao 已提交
60 61 62
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
63

D
dapan1121 已提交
64
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
65
  code = dsDataSinkMgtInit(&cfg);
H
Haojun Liao 已提交
66
  if (code != TSDB_CODE_SUCCESS) {
67
    goto _error;
H
Haojun Liao 已提交
68
  }
L
Liu Jicong 已提交
69 70 71
  if (handle) {
    code = dsCreateDataSinker(pSubplan->pDataSink, handle);
  }
72

H
Haojun Liao 已提交
73
  _error:
H
Haojun Liao 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
  if(pQInfo->sql) {
    int ms = 0;
    char* pcnt = strstr(pQInfo->sql, " count(*)");
    if(pcnt) return 0;
    
    char* pos = strstr(pQInfo->sql, " t_");
    if(pos){
      pos += 3;
      ms = atoi(pos);
      while(*pos >= '0' && *pos <= '9'){
        pos ++;
      }
      char unit_char = *pos;
      if(unit_char == 'h'){
        ms *= 3600*1000;
      } else if(unit_char == 'm'){
        ms *= 60*1000;
      } else if(unit_char == 's'){
        ms *= 1000;
      }
    }
    if(ms == 0) return 0;
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
    
    if(ms < 1000) {
      taosMsleep(ms);
    } else {
      int used_ms = 0;
      while(used_ms < ms) {
        taosMsleep(1000);
        used_ms += 1000;
112
        if(isTaskKilled(pQInfo)){
H
Haojun Liao 已提交
113 114 115 116 117 118 119 120 121 122
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

D
dapan1121 已提交
123
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
H
Haojun Liao 已提交
124 125
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();
H
Haojun Liao 已提交
126

D
dapan1121 已提交
127
  *pRes = NULL;
H
Haojun Liao 已提交
128
  int64_t curOwner = 0;
129
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
H
Haojun Liao 已提交
130
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
H
Haojun Liao 已提交
131
           (void*)curOwner);
132
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
D
dapan1121 已提交
133
    return pTaskInfo->code;
H
Haojun Liao 已提交
134 135
  }

H
Haojun Liao 已提交
136
  if (pTaskInfo->cost.start == 0) {
137
    pTaskInfo->cost.start = taosGetTimestampMs();
H
Haojun Liao 已提交
138 139
  }

140
  if (isTaskKilled(pTaskInfo)) {
H
Haojun Liao 已提交
141
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
D
dapan1121 已提交
142
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
143 144 145
  }

  // error occurs, record the error code and return to client
146
  int32_t ret = setjmp(pTaskInfo->env);
H
Haojun Liao 已提交
147
  if (ret != TSDB_CODE_SUCCESS) {
148 149
    publishQueryAbortEvent(pTaskInfo, ret);
    pTaskInfo->code = ret;
H
Haojun Liao 已提交
150
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
151
           tstrerror(pTaskInfo->code));
D
dapan1121 已提交
152
    return pTaskInfo->code;
H
Haojun Liao 已提交
153 154
  }

H
Haojun Liao 已提交
155
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
156 157

  bool newgroup = false;
158
  publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
H
Haojun Liao 已提交
159
  int64_t st = 0;
H
Haojun Liao 已提交
160

161
  st = taosGetTimestampUs();
H
Haojun Liao 已提交
162
  *pRes = pTaskInfo->pRoot->getNextFn(pTaskInfo->pRoot, &newgroup);
H
Haojun Liao 已提交
163

H
Haojun Liao 已提交
164 165 166
  uint64_t el = (taosGetTimestampUs() - st);
  pTaskInfo->cost.elapsedTime += el;

167
  publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
H
Haojun Liao 已提交
168

D
dapan1121 已提交
169 170 171 172
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

H
Haojun Liao 已提交
173 174 175
  int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
  pTaskInfo->totalRows += current;

H
Haojun Liao 已提交
176 177
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
H
Haojun Liao 已提交
178

179
  atomic_store_64(&pTaskInfo->owner, 0);
D
dapan1121 已提交
180
  return pTaskInfo->code;
H
Haojun Liao 已提交
181 182 183
}

int32_t qKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
184
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
185

H
Haojun Liao 已提交
186
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
187 188 189
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
190
  qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
191
  setTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
192 193 194

  // Wait for the query executing thread being stopped/
  // Once the query is stopped, the owner of qHandle will be cleared immediately.
H
Haojun Liao 已提交
195
  while (pTaskInfo->owner != 0) {
H
Haojun Liao 已提交
196 197 198 199 200 201
    taosMsleep(100);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
202
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
203
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
D
dapan1121 已提交
204

H
Haojun Liao 已提交
205
  if (pTaskInfo == NULL) {
D
dapan1121 已提交
206 207 208
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
209
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
210
  setTaskKilled(pTaskInfo);
D
dapan1121 已提交
211 212 213 214

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
215
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
216
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
217

H
Haojun Liao 已提交
218
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
219 220 221
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

222
  return isTaskKilled(pTaskInfo) || Q_STATUS_EQUAL(pTaskInfo->status, TASK_OVER);
H
Haojun Liao 已提交
223 224
}

H
Haojun Liao 已提交
225 226
void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
H
Haojun Liao 已提交
227
  qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows);
H
Haojun Liao 已提交
228

H
Haojun Liao 已提交
229 230
  queryCostStatis(pTaskInfo);   // print the query cost summary
  doDestroyTask(pTaskInfo);
H
Haojun Liao 已提交
231
}
D
dapan1121 已提交
232 233 234 235 236 237 238 239 240

int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
  int32_t capacity = 0;

  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);  
}