executorMain.c 6.2 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include <vnode.h>
H
Haojun Liao 已提交
17
#include "dataSinkMgt.h"
S
Shengliang Guan 已提交
18
#include "texception.h"
19 20
#include "os.h"
#include "tarray.h"
H
Haojun Liao 已提交
21 22 23
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"
S
slzhou 已提交
24
#include "tudf.h"
H
Haojun Liao 已提交
25 26

#include "executor.h"
S
slzhou 已提交
27 28 29
#include "executorimpl.h"
#include "query.h"
#include "thash.h"
H
Haojun Liao 已提交
30 31 32 33
#include "tlosertree.h"
#include "ttypes.h"

typedef struct STaskMgmt {
wafwerar's avatar
wafwerar 已提交
34
  TdThreadMutex lock;
H
Haojun Liao 已提交
35 36 37 38 39
  SCacheObj      *qinfoPool;      // query handle pool
  int32_t         vgId;
  bool            closed;
} STaskMgmt;

40 41
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
    qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
42
  assert(readHandle != NULL && pSubplan != NULL);
H
Haojun Liao 已提交
43
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
H
Haojun Liao 已提交
44

45
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
H
Haojun Liao 已提交
46 47 48
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
49

D
dapan1121 已提交
50
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
51
  code = dsDataSinkMgtInit(&cfg);
H
Haojun Liao 已提交
52
  if (code != TSDB_CODE_SUCCESS) {
53
    goto _error;
H
Haojun Liao 已提交
54
  }
L
Liu Jicong 已提交
55 56 57
  if (handle) {
    code = dsCreateDataSinker(pSubplan->pDataSink, handle);
  }
58

H
Haojun Liao 已提交
59
  _error:
H
Haojun Liao 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
  if(pQInfo->sql) {
    int ms = 0;
    char* pcnt = strstr(pQInfo->sql, " count(*)");
    if(pcnt) return 0;
    
    char* pos = strstr(pQInfo->sql, " t_");
    if(pos){
      pos += 3;
      ms = atoi(pos);
      while(*pos >= '0' && *pos <= '9'){
        pos ++;
      }
      char unit_char = *pos;
      if(unit_char == 'h'){
        ms *= 3600*1000;
      } else if(unit_char == 'm'){
        ms *= 60*1000;
      } else if(unit_char == 's'){
        ms *= 1000;
      }
    }
    if(ms == 0) return 0;
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
    
    if(ms < 1000) {
      taosMsleep(ms);
    } else {
      int used_ms = 0;
      while(used_ms < ms) {
        taosMsleep(1000);
        used_ms += 1000;
98
        if(isTaskKilled(pQInfo)){
H
Haojun Liao 已提交
99 100 101 102 103 104 105 106 107 108
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

D
dapan1121 已提交
109
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
H
Haojun Liao 已提交
110 111
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();
H
Haojun Liao 已提交
112

D
dapan1121 已提交
113
  *pRes = NULL;
H
Haojun Liao 已提交
114
  int64_t curOwner = 0;
115
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
H
Haojun Liao 已提交
116
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
H
Haojun Liao 已提交
117
           (void*)curOwner);
118
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
D
dapan1121 已提交
119
    return pTaskInfo->code;
H
Haojun Liao 已提交
120 121
  }

H
Haojun Liao 已提交
122
  if (pTaskInfo->cost.start == 0) {
123
    pTaskInfo->cost.start = taosGetTimestampMs();
H
Haojun Liao 已提交
124 125
  }

126
  if (isTaskKilled(pTaskInfo)) {
H
Haojun Liao 已提交
127
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
D
dapan1121 已提交
128
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
129 130 131
  }

  // error occurs, record the error code and return to client
132
  int32_t ret = setjmp(pTaskInfo->env);
H
Haojun Liao 已提交
133
  if (ret != TSDB_CODE_SUCCESS) {
134 135
    publishQueryAbortEvent(pTaskInfo, ret);
    pTaskInfo->code = ret;
H
Haojun Liao 已提交
136
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
137
           tstrerror(pTaskInfo->code));
D
dapan1121 已提交
138
    return pTaskInfo->code;
H
Haojun Liao 已提交
139 140
  }

H
Haojun Liao 已提交
141
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
142

143
  publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
H
Haojun Liao 已提交
144

145 146
  int64_t st = taosGetTimestampUs();
  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
H
Haojun Liao 已提交
147
  uint64_t el = (taosGetTimestampUs() - st);
148

H
Haojun Liao 已提交
149 150
  pTaskInfo->cost.elapsedTime += el;

151
  publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
H
Haojun Liao 已提交
152

D
dapan1121 已提交
153 154 155 156
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

H
Haojun Liao 已提交
157 158 159
  int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
  pTaskInfo->totalRows += current;

S
slzhou 已提交
160
  teardownUdfs();
H
Haojun Liao 已提交
161 162
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
H
Haojun Liao 已提交
163

164
  atomic_store_64(&pTaskInfo->owner, 0);
D
dapan1121 已提交
165
  return pTaskInfo->code;
H
Haojun Liao 已提交
166 167 168
}

int32_t qKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
169
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
170

H
Haojun Liao 已提交
171
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
172 173 174
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
175
  qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
176
  setTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
177 178 179

  // Wait for the query executing thread being stopped/
  // Once the query is stopped, the owner of qHandle will be cleared immediately.
H
Haojun Liao 已提交
180
  while (pTaskInfo->owner != 0) {
H
Haojun Liao 已提交
181 182 183 184 185 186
    taosMsleep(100);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
187
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
188
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
D
dapan1121 已提交
189

H
Haojun Liao 已提交
190
  if (pTaskInfo == NULL) {
D
dapan1121 已提交
191 192 193
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
194
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
195
  setTaskKilled(pTaskInfo);
D
dapan1121 已提交
196 197 198 199

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
200
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
201
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
202

H
Haojun Liao 已提交
203
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
204 205 206
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

207
  return isTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
208 209
}

H
Haojun Liao 已提交
210 211
void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
H
Haojun Liao 已提交
212
  qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows);
H
Haojun Liao 已提交
213

H
Haojun Liao 已提交
214 215
  queryCostStatis(pTaskInfo);   // print the query cost summary
  doDestroyTask(pTaskInfo);
H
Haojun Liao 已提交
216
}
D
dapan1121 已提交
217 218 219 220 221 222 223 224 225

int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
  int32_t capacity = 0;

  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);  
}