executorMain.c 5.9 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include <vnode.h>
H
Haojun Liao 已提交
17
#include "dataSinkMgt.h"
S
Shengliang Guan 已提交
18
#include "texception.h"
19 20
#include "os.h"
#include "tarray.h"
H
Haojun Liao 已提交
21 22 23
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"
S
slzhou 已提交
24
#include "tudf.h"
H
Haojun Liao 已提交
25 26

#include "executor.h"
S
slzhou 已提交
27 28 29
#include "executorimpl.h"
#include "query.h"
#include "thash.h"
H
Haojun Liao 已提交
30 31 32
#include "tlosertree.h"
#include "ttypes.h"

33 34
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
    qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
35
  assert(readHandle != NULL && pSubplan != NULL);
H
Haojun Liao 已提交
36
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
H
Haojun Liao 已提交
37

38
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
H
Haojun Liao 已提交
39 40 41
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
42

D
dapan1121 已提交
43
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
44
  code = dsDataSinkMgtInit(&cfg);
H
Haojun Liao 已提交
45
  if (code != TSDB_CODE_SUCCESS) {
46
    goto _error;
H
Haojun Liao 已提交
47
  }
L
Liu Jicong 已提交
48 49 50
  if (handle) {
    code = dsCreateDataSinker(pSubplan->pDataSink, handle);
  }
51

H
Haojun Liao 已提交
52
  _error:
H
Haojun Liao 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
  if(pQInfo->sql) {
    int ms = 0;
    char* pcnt = strstr(pQInfo->sql, " count(*)");
    if(pcnt) return 0;
    
    char* pos = strstr(pQInfo->sql, " t_");
    if(pos){
      pos += 3;
      ms = atoi(pos);
      while(*pos >= '0' && *pos <= '9'){
        pos ++;
      }
      char unit_char = *pos;
      if(unit_char == 'h'){
        ms *= 3600*1000;
      } else if(unit_char == 'm'){
        ms *= 60*1000;
      } else if(unit_char == 's'){
        ms *= 1000;
      }
    }
    if(ms == 0) return 0;
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
    
    if(ms < 1000) {
      taosMsleep(ms);
    } else {
      int used_ms = 0;
      while(used_ms < ms) {
        taosMsleep(1000);
        used_ms += 1000;
91
        if(isTaskKilled(pQInfo)){
H
Haojun Liao 已提交
92 93 94 95 96 97 98 99 100 101
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

D
dapan1121 已提交
102
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
H
Haojun Liao 已提交
103 104
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();
H
Haojun Liao 已提交
105

D
dapan1121 已提交
106
  *pRes = NULL;
H
Haojun Liao 已提交
107
  int64_t curOwner = 0;
108
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
H
Haojun Liao 已提交
109
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
H
Haojun Liao 已提交
110
           (void*)curOwner);
111
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
D
dapan1121 已提交
112
    return pTaskInfo->code;
H
Haojun Liao 已提交
113 114
  }

H
Haojun Liao 已提交
115
  if (pTaskInfo->cost.start == 0) {
116
    pTaskInfo->cost.start = taosGetTimestampMs();
H
Haojun Liao 已提交
117 118
  }

119
  if (isTaskKilled(pTaskInfo)) {
H
Haojun Liao 已提交
120
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
D
dapan1121 已提交
121
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
122 123 124
  }

  // error occurs, record the error code and return to client
125
  int32_t ret = setjmp(pTaskInfo->env);
H
Haojun Liao 已提交
126
  if (ret != TSDB_CODE_SUCCESS) {
127
    pTaskInfo->code = ret;
128
    cleanUpUdfs();
129
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
D
dapan1121 已提交
130
    return pTaskInfo->code;
H
Haojun Liao 已提交
131 132
  }

H
Haojun Liao 已提交
133
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
134

135 136
  int64_t st = taosGetTimestampUs();
  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
H
Haojun Liao 已提交
137
  uint64_t el = (taosGetTimestampUs() - st);
138

H
Haojun Liao 已提交
139
  pTaskInfo->cost.elapsedTime += el;
D
dapan1121 已提交
140 141 142 143
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

144 145
  cleanUpUdfs();

H
Haojun Liao 已提交
146
  int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
147
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
H
Haojun Liao 已提交
148

H
Haojun Liao 已提交
149
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
150
         GET_TASKID(pTaskInfo), current, total, 0, el/1000.0);
H
Haojun Liao 已提交
151

152
  atomic_store_64(&pTaskInfo->owner, 0);
D
dapan1121 已提交
153
  return pTaskInfo->code;
H
Haojun Liao 已提交
154 155 156
}

int32_t qKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
157
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
158

H
Haojun Liao 已提交
159
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
160 161 162
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
163
  qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
164
  setTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
165 166 167

  // Wait for the query executing thread being stopped/
  // Once the query is stopped, the owner of qHandle will be cleared immediately.
H
Haojun Liao 已提交
168
  while (pTaskInfo->owner != 0) {
H
Haojun Liao 已提交
169 170 171 172 173 174
    taosMsleep(100);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
175
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
176
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
D
dapan1121 已提交
177

H
Haojun Liao 已提交
178
  if (pTaskInfo == NULL) {
D
dapan1121 已提交
179 180 181
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
182
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
183
  setTaskKilled(pTaskInfo);
D
dapan1121 已提交
184 185 186 187

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
188
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
189
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
190

H
Haojun Liao 已提交
191
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
192 193 194
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

195
  return isTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
196 197
}

H
Haojun Liao 已提交
198 199
void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
200
  qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
H
Haojun Liao 已提交
201

H
Haojun Liao 已提交
202 203
  queryCostStatis(pTaskInfo);   // print the query cost summary
  doDestroyTask(pTaskInfo);
H
Haojun Liao 已提交
204
}
D
dapan1121 已提交
205 206 207 208 209 210 211 212 213

int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
  int32_t capacity = 0;

  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);  
}