executorMain.c 6.2 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include <vnode.h>
H
Haojun Liao 已提交
17
#include "dataSinkMgt.h"
18 19
#include "os.h"
#include "tarray.h"
H
Haojun Liao 已提交
20
#include "tcache.h"
21
#include "texception.h"
H
Haojun Liao 已提交
22 23
#include "tglobal.h"
#include "tmsg.h"
S
slzhou 已提交
24
#include "tudf.h"
H
Haojun Liao 已提交
25 26

#include "executor.h"
S
slzhou 已提交
27 28 29
#include "executorimpl.h"
#include "query.h"
#include "thash.h"
H
Haojun Liao 已提交
30 31 32 33
#include "tlosertree.h"
#include "ttypes.h"

typedef struct STaskMgmt {
wafwerar's avatar
wafwerar 已提交
34
  TdThreadMutex lock;
35 36 37
  SCacheObj    *qinfoPool;  // query handle pool
  int32_t       vgId;
  bool          closed;
H
Haojun Liao 已提交
38 39
} STaskMgmt;

40 41
int32_t qCreateExecTask(SReadHandle *readHandle, int32_t vgId, uint64_t taskId, SSubplan *pSubplan,
                        qTaskInfo_t *pTaskInfo, DataSinkHandle *handle, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
42
  assert(readHandle != NULL && pSubplan != NULL);
43
  SExecTaskInfo **pTask = (SExecTaskInfo **)pTaskInfo;
H
Haojun Liao 已提交
44

45
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
H
Haojun Liao 已提交
46 47 48
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
49

D
dapan1121 已提交
50
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
51
  code = dsDataSinkMgtInit(&cfg);
H
Haojun Liao 已提交
52
  if (code != TSDB_CODE_SUCCESS) {
53
    goto _error;
H
Haojun Liao 已提交
54
  }
L
Liu Jicong 已提交
55 56 57
  if (handle) {
    code = dsCreateDataSinker(pSubplan->pDataSink, handle);
  }
58

59
_error:
H
Haojun Liao 已提交
60 61 62 63 64 65
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
66 67 68 69 70 71 72 73
int waitMoment(SQInfo *pQInfo) {
  if (pQInfo->sql) {
    int   ms = 0;
    char *pcnt = strstr(pQInfo->sql, " count(*)");
    if (pcnt) return 0;

    char *pos = strstr(pQInfo->sql, " t_");
    if (pos) {
H
Haojun Liao 已提交
74 75
      pos += 3;
      ms = atoi(pos);
76 77
      while (*pos >= '0' && *pos <= '9') {
        pos++;
H
Haojun Liao 已提交
78 79
      }
      char unit_char = *pos;
80 81 82 83 84
      if (unit_char == 'h') {
        ms *= 3600 * 1000;
      } else if (unit_char == 'm') {
        ms *= 60 * 1000;
      } else if (unit_char == 's') {
H
Haojun Liao 已提交
85 86 87
        ms *= 1000;
      }
    }
88
    if (ms == 0) return 0;
H
Haojun Liao 已提交
89
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
90 91

    if (ms < 1000) {
H
Haojun Liao 已提交
92 93 94
      taosMsleep(ms);
    } else {
      int used_ms = 0;
95
      while (used_ms < ms) {
H
Haojun Liao 已提交
96 97
        taosMsleep(1000);
        used_ms += 1000;
98
        if (isTaskKilled(pQInfo)) {
H
Haojun Liao 已提交
99 100 101 102 103 104 105 106 107 108
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

109 110
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) {
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
H
Haojun Liao 已提交
111
  int64_t        threadId = taosGetSelfPthreadId();
H
Haojun Liao 已提交
112

D
dapan1121 已提交
113
  *pRes = NULL;
H
Haojun Liao 已提交
114
  int64_t curOwner = 0;
115
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
116
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void *)curOwner);
117
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
D
dapan1121 已提交
118
    return pTaskInfo->code;
H
Haojun Liao 已提交
119 120
  }

H
Haojun Liao 已提交
121
  if (pTaskInfo->cost.start == 0) {
122
    pTaskInfo->cost.start = taosGetTimestampMs();
H
Haojun Liao 已提交
123 124
  }

125
  if (isTaskKilled(pTaskInfo)) {
H
Haojun Liao 已提交
126
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
D
dapan1121 已提交
127
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
128 129 130
  }

  // error occurs, record the error code and return to client
131
  int32_t ret = setjmp(pTaskInfo->env);
H
Haojun Liao 已提交
132
  if (ret != TSDB_CODE_SUCCESS) {
133 134
    publishQueryAbortEvent(pTaskInfo, ret);
    pTaskInfo->code = ret;
135
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
D
dapan1121 已提交
136
    return pTaskInfo->code;
H
Haojun Liao 已提交
137 138
  }

139
  /*qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));*/
H
Haojun Liao 已提交
140

141
  publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
H
Haojun Liao 已提交
142

143 144
  int64_t st = taosGetTimestampUs();
  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
H
Haojun Liao 已提交
145
  uint64_t el = (taosGetTimestampUs() - st);
146

H
Haojun Liao 已提交
147 148
  pTaskInfo->cost.elapsedTime += el;

149
  publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
H
Haojun Liao 已提交
150

D
dapan1121 已提交
151 152 153 154
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

155
  int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
H
Haojun Liao 已提交
156 157
  pTaskInfo->totalRows += current;

158
  cleanUpUdfs();
159 160
  /*qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",*/
  /*GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);*/
H
Haojun Liao 已提交
161

162
  atomic_store_64(&pTaskInfo->owner, 0);
D
dapan1121 已提交
163
  return pTaskInfo->code;
H
Haojun Liao 已提交
164 165 166
}

int32_t qKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
167
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
168

H
Haojun Liao 已提交
169
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
170 171 172
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
173
  qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
174
  setTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
175 176 177

  // Wait for the query executing thread being stopped/
  // Once the query is stopped, the owner of qHandle will be cleared immediately.
H
Haojun Liao 已提交
178
  while (pTaskInfo->owner != 0) {
H
Haojun Liao 已提交
179 180 181 182 183 184
    taosMsleep(100);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
185
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
186
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
D
dapan1121 已提交
187

H
Haojun Liao 已提交
188
  if (pTaskInfo == NULL) {
D
dapan1121 已提交
189 190 191
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
192
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
193
  setTaskKilled(pTaskInfo);
D
dapan1121 已提交
194 195 196 197

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
198
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
199
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
200

H
Haojun Liao 已提交
201
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
202 203 204
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

205
  return isTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
206 207
}

H
Haojun Liao 已提交
208
void qDestroyTask(qTaskInfo_t qTaskHandle) {
209 210
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qTaskHandle;
  qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows);
H
Haojun Liao 已提交
211

212
  queryCostStatis(pTaskInfo);  // print the query cost summary
H
Haojun Liao 已提交
213
  doDestroyTask(pTaskInfo);
H
Haojun Liao 已提交
214
}
D
dapan1121 已提交
215 216 217

int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
218
  int32_t        capacity = 0;
D
dapan1121 已提交
219

220
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
D
dapan1121 已提交
221 222
}