executorMain.c 6.2 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include <vnode.h>
H
Haojun Liao 已提交
17
#include "dataSinkMgt.h"
S
Shengliang Guan 已提交
18
#include "texception.h"
19 20
#include "os.h"
#include "tarray.h"
H
Haojun Liao 已提交
21 22 23
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"
S
slzhou 已提交
24
#include "tudf.h"
H
Haojun Liao 已提交
25 26

#include "executor.h"
S
slzhou 已提交
27 28 29
#include "executorimpl.h"
#include "query.h"
#include "thash.h"
H
Haojun Liao 已提交
30 31 32 33
#include "tlosertree.h"
#include "ttypes.h"

typedef struct STaskMgmt {
wafwerar's avatar
wafwerar 已提交
34
  TdThreadMutex lock;
H
Haojun Liao 已提交
35 36 37 38 39
  SCacheObj      *qinfoPool;      // query handle pool
  int32_t         vgId;
  bool            closed;
} STaskMgmt;

40 41
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
    qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
42
  assert(readHandle != NULL && pSubplan != NULL);
H
Haojun Liao 已提交
43
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
H
Haojun Liao 已提交
44

45
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
H
Haojun Liao 已提交
46 47 48
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
49

D
dapan1121 已提交
50
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
51
  code = dsDataSinkMgtInit(&cfg);
H
Haojun Liao 已提交
52
  if (code != TSDB_CODE_SUCCESS) {
53
    goto _error;
H
Haojun Liao 已提交
54
  }
L
Liu Jicong 已提交
55 56 57
  if (handle) {
    code = dsCreateDataSinker(pSubplan->pDataSink, handle);
  }
58

H
Haojun Liao 已提交
59
  _error:
H
Haojun Liao 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
  if(pQInfo->sql) {
    int ms = 0;
    char* pcnt = strstr(pQInfo->sql, " count(*)");
    if(pcnt) return 0;
    
    char* pos = strstr(pQInfo->sql, " t_");
    if(pos){
      pos += 3;
      ms = atoi(pos);
      while(*pos >= '0' && *pos <= '9'){
        pos ++;
      }
      char unit_char = *pos;
      if(unit_char == 'h'){
        ms *= 3600*1000;
      } else if(unit_char == 'm'){
        ms *= 60*1000;
      } else if(unit_char == 's'){
        ms *= 1000;
      }
    }
    if(ms == 0) return 0;
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
    
    if(ms < 1000) {
      taosMsleep(ms);
    } else {
      int used_ms = 0;
      while(used_ms < ms) {
        taosMsleep(1000);
        used_ms += 1000;
98
        if(isTaskKilled(pQInfo)){
H
Haojun Liao 已提交
99 100 101 102 103 104 105 106 107 108
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

D
dapan1121 已提交
109
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
H
Haojun Liao 已提交
110 111
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();
H
Haojun Liao 已提交
112

D
dapan1121 已提交
113
  *pRes = NULL;
H
Haojun Liao 已提交
114
  int64_t curOwner = 0;
115
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
H
Haojun Liao 已提交
116
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
H
Haojun Liao 已提交
117
           (void*)curOwner);
118
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
D
dapan1121 已提交
119
    return pTaskInfo->code;
H
Haojun Liao 已提交
120 121
  }

H
Haojun Liao 已提交
122
  if (pTaskInfo->cost.start == 0) {
123
    pTaskInfo->cost.start = taosGetTimestampMs();
H
Haojun Liao 已提交
124 125
  }

126
  if (isTaskKilled(pTaskInfo)) {
H
Haojun Liao 已提交
127
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
D
dapan1121 已提交
128
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
129 130 131
  }

  // error occurs, record the error code and return to client
132
  int32_t ret = setjmp(pTaskInfo->env);
H
Haojun Liao 已提交
133
  if (ret != TSDB_CODE_SUCCESS) {
134 135
    publishQueryAbortEvent(pTaskInfo, ret);
    pTaskInfo->code = ret;
136
    cleanUpUdfs();
H
Haojun Liao 已提交
137
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
138
           tstrerror(pTaskInfo->code));
D
dapan1121 已提交
139
    return pTaskInfo->code;
H
Haojun Liao 已提交
140 141
  }

H
Haojun Liao 已提交
142
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
143

144
  publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
H
Haojun Liao 已提交
145

146 147
  int64_t st = taosGetTimestampUs();
  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
H
Haojun Liao 已提交
148
  uint64_t el = (taosGetTimestampUs() - st);
149

H
Haojun Liao 已提交
150 151
  pTaskInfo->cost.elapsedTime += el;

152
  publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
H
Haojun Liao 已提交
153

D
dapan1121 已提交
154 155 156 157
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

H
Haojun Liao 已提交
158 159 160
  int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
  pTaskInfo->totalRows += current;

161
  cleanUpUdfs();
H
Haojun Liao 已提交
162 163
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
H
Haojun Liao 已提交
164

165
  atomic_store_64(&pTaskInfo->owner, 0);
D
dapan1121 已提交
166
  return pTaskInfo->code;
H
Haojun Liao 已提交
167 168 169
}

int32_t qKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
170
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
171

H
Haojun Liao 已提交
172
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
173 174 175
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
176
  qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
177
  setTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
178 179 180

  // Wait for the query executing thread being stopped/
  // Once the query is stopped, the owner of qHandle will be cleared immediately.
H
Haojun Liao 已提交
181
  while (pTaskInfo->owner != 0) {
H
Haojun Liao 已提交
182 183 184 185 186 187
    taosMsleep(100);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
188
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
189
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
D
dapan1121 已提交
190

H
Haojun Liao 已提交
191
  if (pTaskInfo == NULL) {
D
dapan1121 已提交
192 193 194
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
195
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
196
  setTaskKilled(pTaskInfo);
D
dapan1121 已提交
197 198 199 200

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
201
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
202
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
203

H
Haojun Liao 已提交
204
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
205 206 207
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

208
  return isTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
209 210
}

H
Haojun Liao 已提交
211 212
void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
H
Haojun Liao 已提交
213
  qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows);
H
Haojun Liao 已提交
214

H
Haojun Liao 已提交
215 216
  queryCostStatis(pTaskInfo);   // print the query cost summary
  doDestroyTask(pTaskInfo);
H
Haojun Liao 已提交
217
}
D
dapan1121 已提交
218 219 220 221 222 223 224 225 226

int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
  int32_t capacity = 0;

  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);  
}