executorMain.c 6.2 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include <vnode.h>
H
Haojun Liao 已提交
17
#include "dataSinkMgt.h"
S
Shengliang Guan 已提交
18
#include "texception.h"
19 20
#include "os.h"
#include "tarray.h"
H
Haojun Liao 已提交
21 22 23 24 25 26 27 28 29 30 31 32
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"

#include "thash.h"
#include "executorimpl.h"
#include "executor.h"
#include "tlosertree.h"
#include "ttypes.h"
#include "query.h"

typedef struct STaskMgmt {
wafwerar's avatar
wafwerar 已提交
33
  TdThreadMutex lock;
H
Haojun Liao 已提交
34 35 36 37 38
  SCacheObj      *qinfoPool;      // query handle pool
  int32_t         vgId;
  bool            closed;
} STaskMgmt;

39 40
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
    qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
41
  assert(readHandle != NULL && pSubplan != NULL);
H
Haojun Liao 已提交
42
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
H
Haojun Liao 已提交
43

44
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
H
Haojun Liao 已提交
45 46 47
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
48

D
dapan1121 已提交
49
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
50
  code = dsDataSinkMgtInit(&cfg);
H
Haojun Liao 已提交
51
  if (code != TSDB_CODE_SUCCESS) {
52
    goto _error;
H
Haojun Liao 已提交
53
  }
L
Liu Jicong 已提交
54 55 56
  if (handle) {
    code = dsCreateDataSinker(pSubplan->pDataSink, handle);
  }
57

H
Haojun Liao 已提交
58
  _error:
H
Haojun Liao 已提交
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
  if(pQInfo->sql) {
    int ms = 0;
    char* pcnt = strstr(pQInfo->sql, " count(*)");
    if(pcnt) return 0;
    
    char* pos = strstr(pQInfo->sql, " t_");
    if(pos){
      pos += 3;
      ms = atoi(pos);
      while(*pos >= '0' && *pos <= '9'){
        pos ++;
      }
      char unit_char = *pos;
      if(unit_char == 'h'){
        ms *= 3600*1000;
      } else if(unit_char == 'm'){
        ms *= 60*1000;
      } else if(unit_char == 's'){
        ms *= 1000;
      }
    }
    if(ms == 0) return 0;
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
    
    if(ms < 1000) {
      taosMsleep(ms);
    } else {
      int used_ms = 0;
      while(used_ms < ms) {
        taosMsleep(1000);
        used_ms += 1000;
97
        if(isTaskKilled(pQInfo)){
H
Haojun Liao 已提交
98 99 100 101 102 103 104 105 106 107
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

D
dapan1121 已提交
108
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
H
Haojun Liao 已提交
109 110
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();
H
Haojun Liao 已提交
111

D
dapan1121 已提交
112
  *pRes = NULL;
H
Haojun Liao 已提交
113
  int64_t curOwner = 0;
114
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
H
Haojun Liao 已提交
115
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
H
Haojun Liao 已提交
116
           (void*)curOwner);
117
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
D
dapan1121 已提交
118
    return pTaskInfo->code;
H
Haojun Liao 已提交
119 120
  }

H
Haojun Liao 已提交
121
  if (pTaskInfo->cost.start == 0) {
122
    pTaskInfo->cost.start = taosGetTimestampMs();
H
Haojun Liao 已提交
123 124
  }

125
  if (isTaskKilled(pTaskInfo)) {
H
Haojun Liao 已提交
126
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
D
dapan1121 已提交
127
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
128 129 130
  }

  // error occurs, record the error code and return to client
131
  int32_t ret = setjmp(pTaskInfo->env);
H
Haojun Liao 已提交
132
  if (ret != TSDB_CODE_SUCCESS) {
133 134
    publishQueryAbortEvent(pTaskInfo, ret);
    pTaskInfo->code = ret;
H
Haojun Liao 已提交
135
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
136
           tstrerror(pTaskInfo->code));
D
dapan1121 已提交
137
    return pTaskInfo->code;
H
Haojun Liao 已提交
138 139
  }

H
Haojun Liao 已提交
140
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
141

142
  publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
H
Haojun Liao 已提交
143

144 145
  int64_t st = taosGetTimestampUs();
  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
H
Haojun Liao 已提交
146
  uint64_t el = (taosGetTimestampUs() - st);
147

H
Haojun Liao 已提交
148 149
  pTaskInfo->cost.elapsedTime += el;

150
  publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
H
Haojun Liao 已提交
151

D
dapan1121 已提交
152 153 154 155
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

H
Haojun Liao 已提交
156 157 158
  int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
  pTaskInfo->totalRows += current;

H
Haojun Liao 已提交
159 160
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
H
Haojun Liao 已提交
161

162
  atomic_store_64(&pTaskInfo->owner, 0);
D
dapan1121 已提交
163
  return pTaskInfo->code;
H
Haojun Liao 已提交
164 165 166
}

int32_t qKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
167
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
168

H
Haojun Liao 已提交
169
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
170 171 172
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
173
  qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
174
  setTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
175 176 177

  // Wait for the query executing thread being stopped/
  // Once the query is stopped, the owner of qHandle will be cleared immediately.
H
Haojun Liao 已提交
178
  while (pTaskInfo->owner != 0) {
H
Haojun Liao 已提交
179 180 181 182 183 184
    taosMsleep(100);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
185
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
186
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
D
dapan1121 已提交
187

H
Haojun Liao 已提交
188
  if (pTaskInfo == NULL) {
D
dapan1121 已提交
189 190 191
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
192
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
193
  setTaskKilled(pTaskInfo);
D
dapan1121 已提交
194 195 196 197

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
198
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
199
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
200

H
Haojun Liao 已提交
201
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
202 203 204
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

205
  return isTaskKilled(pTaskInfo) || Q_STATUS_EQUAL(pTaskInfo->status, TASK_OVER);
H
Haojun Liao 已提交
206 207
}

H
Haojun Liao 已提交
208 209
void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
H
Haojun Liao 已提交
210
  qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows);
H
Haojun Liao 已提交
211

H
Haojun Liao 已提交
212 213
  queryCostStatis(pTaskInfo);   // print the query cost summary
  doDestroyTask(pTaskInfo);
H
Haojun Liao 已提交
214
}
D
dapan1121 已提交
215 216 217 218 219 220 221 222 223

int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
  int32_t capacity = 0;

  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);  
}