executorMain.c 6.1 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include <vnode.h>
H
Haojun Liao 已提交
17
#include "dataSinkMgt.h"
S
Shengliang Guan 已提交
18
#include "texception.h"
19 20
#include "os.h"
#include "tarray.h"
H
Haojun Liao 已提交
21 22 23
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"
S
slzhou 已提交
24
#include "tudf.h"
H
Haojun Liao 已提交
25 26

#include "executor.h"
S
slzhou 已提交
27 28 29
#include "executorimpl.h"
#include "query.h"
#include "thash.h"
H
Haojun Liao 已提交
30 31 32
#include "tlosertree.h"
#include "ttypes.h"

33 34
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
    qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
35
  assert(readHandle != NULL && pSubplan != NULL);
H
Haojun Liao 已提交
36
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
H
Haojun Liao 已提交
37

38
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
H
Haojun Liao 已提交
39 40 41
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
42

D
dapan1121 已提交
43
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
44
  code = dsDataSinkMgtInit(&cfg);
H
Haojun Liao 已提交
45
  if (code != TSDB_CODE_SUCCESS) {
46
    goto _error;
H
Haojun Liao 已提交
47
  }
D
dapan1121 已提交
48
  
L
Liu Jicong 已提交
49
  if (handle) {
D
dapan1121 已提交
50 51 52 53 54 55 56
    void* pSinkParam = NULL;
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
    
    code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
L
Liu Jicong 已提交
57
  }
58

H
Haojun Liao 已提交
59
  _error:
H
Haojun Liao 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
  if(pQInfo->sql) {
    int ms = 0;
    char* pcnt = strstr(pQInfo->sql, " count(*)");
    if(pcnt) return 0;
    
    char* pos = strstr(pQInfo->sql, " t_");
    if(pos){
      pos += 3;
      ms = atoi(pos);
      while(*pos >= '0' && *pos <= '9'){
        pos ++;
      }
      char unit_char = *pos;
      if(unit_char == 'h'){
        ms *= 3600*1000;
      } else if(unit_char == 'm'){
        ms *= 60*1000;
      } else if(unit_char == 's'){
        ms *= 1000;
      }
    }
    if(ms == 0) return 0;
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
    
    if(ms < 1000) {
      taosMsleep(ms);
    } else {
      int used_ms = 0;
      while(used_ms < ms) {
        taosMsleep(1000);
        used_ms += 1000;
98
        if(isTaskKilled(pQInfo)){
H
Haojun Liao 已提交
99 100 101 102 103 104 105 106 107 108
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

D
dapan1121 已提交
109
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
H
Haojun Liao 已提交
110 111
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();
H
Haojun Liao 已提交
112

D
dapan1121 已提交
113
  *pRes = NULL;
H
Haojun Liao 已提交
114
  int64_t curOwner = 0;
115
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
H
Haojun Liao 已提交
116
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
H
Haojun Liao 已提交
117
           (void*)curOwner);
118
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
D
dapan1121 已提交
119
    return pTaskInfo->code;
H
Haojun Liao 已提交
120 121
  }

H
Haojun Liao 已提交
122
  if (pTaskInfo->cost.start == 0) {
123
    pTaskInfo->cost.start = taosGetTimestampMs();
H
Haojun Liao 已提交
124 125
  }

126
  if (isTaskKilled(pTaskInfo)) {
H
Haojun Liao 已提交
127
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
D
dapan1121 已提交
128
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
129 130 131
  }

  // error occurs, record the error code and return to client
132
  int32_t ret = setjmp(pTaskInfo->env);
H
Haojun Liao 已提交
133
  if (ret != TSDB_CODE_SUCCESS) {
134
    pTaskInfo->code = ret;
135
    cleanUpUdfs();
136
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
D
dapan1121 已提交
137
    return pTaskInfo->code;
H
Haojun Liao 已提交
138 139
  }

H
Haojun Liao 已提交
140
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
141

142 143
  int64_t st = taosGetTimestampUs();
  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
H
Haojun Liao 已提交
144
  uint64_t el = (taosGetTimestampUs() - st);
145

H
Haojun Liao 已提交
146
  pTaskInfo->cost.elapsedTime += el;
D
dapan1121 已提交
147 148 149 150
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

151 152
  cleanUpUdfs();

H
Haojun Liao 已提交
153
  int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
154
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
H
Haojun Liao 已提交
155

H
Haojun Liao 已提交
156
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
157
         GET_TASKID(pTaskInfo), current, total, 0, el/1000.0);
H
Haojun Liao 已提交
158

159
  atomic_store_64(&pTaskInfo->owner, 0);
D
dapan1121 已提交
160
  return pTaskInfo->code;
H
Haojun Liao 已提交
161 162 163
}

int32_t qKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
164
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
165

H
Haojun Liao 已提交
166
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
167 168 169
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
170
  qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
171
  setTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
172 173 174

  // Wait for the query executing thread being stopped/
  // Once the query is stopped, the owner of qHandle will be cleared immediately.
H
Haojun Liao 已提交
175
  while (pTaskInfo->owner != 0) {
H
Haojun Liao 已提交
176 177 178 179 180 181
    taosMsleep(100);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
182
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
183
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
D
dapan1121 已提交
184

H
Haojun Liao 已提交
185
  if (pTaskInfo == NULL) {
D
dapan1121 已提交
186 187 188
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
189
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
190
  setTaskKilled(pTaskInfo);
D
dapan1121 已提交
191 192 193 194

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
195
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
196
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
197

H
Haojun Liao 已提交
198
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
199 200 201
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

202
  return isTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
203 204
}

H
Haojun Liao 已提交
205 206
void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
207
  qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
H
Haojun Liao 已提交
208

H
Haojun Liao 已提交
209 210
  queryCostStatis(pTaskInfo);   // print the query cost summary
  doDestroyTask(pTaskInfo);
H
Haojun Liao 已提交
211
}
D
dapan1121 已提交
212 213 214 215 216 217 218 219 220

int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
  int32_t capacity = 0;

  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);  
}