executorMain.c 6.2 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "os.h"
17 18
#include "tref.h"
#include "dataSinkMgt.h"
H
Haojun Liao 已提交
19
#include "tmsg.h"
S
slzhou 已提交
20
#include "tudf.h"
H
Haojun Liao 已提交
21 22

#include "executor.h"
S
slzhou 已提交
23 24
#include "executorimpl.h"
#include "query.h"
25 26 27 28 29 30 31

static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t exchangeObjRefPool = -1;

static void initRefPool() {
  exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
}
H
Haojun Liao 已提交
32

33
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
34
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
35
  assert(readHandle != NULL && pSubplan != NULL);
H
Haojun Liao 已提交
36
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
H
Haojun Liao 已提交
37

38 39
  taosThreadOnce(&initPoolOnce, initRefPool);

40
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
H
Haojun Liao 已提交
41 42 43
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
44

D
dapan1121 已提交
45
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
46
  code = dsDataSinkMgtInit(&cfg);
H
Haojun Liao 已提交
47
  if (code != TSDB_CODE_SUCCESS) {
48
    goto _error;
H
Haojun Liao 已提交
49
  }
D
dapan1121 已提交
50
  
L
Liu Jicong 已提交
51
  if (handle) {
D
dapan1121 已提交
52 53 54 55 56 57 58
    void* pSinkParam = NULL;
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
    
    code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
L
Liu Jicong 已提交
59
  }
60

H
Haojun Liao 已提交
61
  _error:
H
Haojun Liao 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
  if(pQInfo->sql) {
    int ms = 0;
    char* pcnt = strstr(pQInfo->sql, " count(*)");
    if(pcnt) return 0;
    
    char* pos = strstr(pQInfo->sql, " t_");
    if(pos){
      pos += 3;
      ms = atoi(pos);
      while(*pos >= '0' && *pos <= '9'){
        pos ++;
      }
      char unit_char = *pos;
      if(unit_char == 'h'){
        ms *= 3600*1000;
      } else if(unit_char == 'm'){
        ms *= 60*1000;
      } else if(unit_char == 's'){
        ms *= 1000;
      }
    }
    if(ms == 0) return 0;
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
    
    if(ms < 1000) {
      taosMsleep(ms);
    } else {
      int used_ms = 0;
      while(used_ms < ms) {
        taosMsleep(1000);
        used_ms += 1000;
100
        if(isTaskKilled(pQInfo)){
H
Haojun Liao 已提交
101 102 103 104 105 106 107 108 109 110
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

D
dapan1121 已提交
111
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
H
Haojun Liao 已提交
112 113
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();
H
Haojun Liao 已提交
114

D
dapan1121 已提交
115
  *pRes = NULL;
H
Haojun Liao 已提交
116
  int64_t curOwner = 0;
117
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
H
Haojun Liao 已提交
118
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
H
Haojun Liao 已提交
119
           (void*)curOwner);
120
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
D
dapan1121 已提交
121
    return pTaskInfo->code;
H
Haojun Liao 已提交
122 123
  }

H
Haojun Liao 已提交
124
  if (pTaskInfo->cost.start == 0) {
125
    pTaskInfo->cost.start = taosGetTimestampMs();
H
Haojun Liao 已提交
126 127
  }

128
  if (isTaskKilled(pTaskInfo)) {
H
Haojun Liao 已提交
129
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
D
dapan1121 已提交
130
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
131 132 133
  }

  // error occurs, record the error code and return to client
134
  int32_t ret = setjmp(pTaskInfo->env);
H
Haojun Liao 已提交
135
  if (ret != TSDB_CODE_SUCCESS) {
136
    pTaskInfo->code = ret;
137
    cleanUpUdfs();
138
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
D
dapan1121 已提交
139
    return pTaskInfo->code;
H
Haojun Liao 已提交
140 141
  }

H
Haojun Liao 已提交
142
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
143

144 145
  int64_t st = taosGetTimestampUs();
  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
H
Haojun Liao 已提交
146
  uint64_t el = (taosGetTimestampUs() - st);
147

H
Haojun Liao 已提交
148
  pTaskInfo->cost.elapsedTime += el;
D
dapan1121 已提交
149 150 151 152
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

153 154
  cleanUpUdfs();

H
Haojun Liao 已提交
155
  int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
156
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
H
Haojun Liao 已提交
157

H
Haojun Liao 已提交
158
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
159
         GET_TASKID(pTaskInfo), current, total, 0, el/1000.0);
H
Haojun Liao 已提交
160

161
  atomic_store_64(&pTaskInfo->owner, 0);
D
dapan1121 已提交
162
  return pTaskInfo->code;
H
Haojun Liao 已提交
163 164 165
}

int32_t qKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
166
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
167

H
Haojun Liao 已提交
168
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
169 170 171
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
172
  qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
173
  setTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
174 175 176

  // Wait for the query executing thread being stopped/
  // Once the query is stopped, the owner of qHandle will be cleared immediately.
H
Haojun Liao 已提交
177
  while (pTaskInfo->owner != 0) {
H
Haojun Liao 已提交
178 179 180 181 182 183
    taosMsleep(100);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
184
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
185
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
D
dapan1121 已提交
186

H
Haojun Liao 已提交
187
  if (pTaskInfo == NULL) {
D
dapan1121 已提交
188 189 190
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
191
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
192
  setTaskKilled(pTaskInfo);
D
dapan1121 已提交
193 194 195 196

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
197
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
198
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
199

H
Haojun Liao 已提交
200
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
201 202 203
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

204
  return isTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
205 206
}

H
Haojun Liao 已提交
207 208
void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
209
  qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
H
Haojun Liao 已提交
210

H
Haojun Liao 已提交
211 212
  queryCostStatis(pTaskInfo);   // print the query cost summary
  doDestroyTask(pTaskInfo);
H
Haojun Liao 已提交
213
}
D
dapan1121 已提交
214 215 216 217 218 219 220 221 222

int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
  int32_t capacity = 0;

  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);  
}