executorMain.c 7.5 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include <tsdb.h>
H
Haojun Liao 已提交
17 18
#include "dataSinkMgt.h"
#include "exception.h"
19 20
#include "os.h"
#include "tarray.h"
H
Haojun Liao 已提交
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"

#include "thash.h"
#include "executorimpl.h"
#include "executor.h"
#include "tlosertree.h"
#include "ttypes.h"
#include "query.h"

typedef struct STaskMgmt {
  pthread_mutex_t lock;
  SCacheObj      *qinfoPool;      // query handle pool
  int32_t         vgId;
  bool            closed;
} STaskMgmt;

static void taskMgmtKillTaskFn(void* handle, void* param1) {
  void** fp = (void**)handle;
  qKillTask(*fp);
}

static void freeqinfoFn(void *qhandle) {
  void** handle = qhandle;
  if (handle == NULL || *handle == NULL) {
    return;
  }

  qKillTask(*handle);
  qDestroyTask(*handle);
}

void freeParam(STaskParam *param) {
  tfree(param->sql);
  tfree(param->tagCond);
  tfree(param->tbnameCond);
  tfree(param->pTableIdList);
  taosArrayDestroy(param->pOperator);
  tfree(param->pExprs);
  tfree(param->pSecExprs);

  tfree(param->pExpr);
  tfree(param->pSecExpr);

  tfree(param->pGroupColIndex);
  tfree(param->pTagColumnInfo);
  tfree(param->pGroupbyExpr);
  tfree(param->prevResult);
}

H
Haojun Liao 已提交
72
int32_t qCreateExecTask(void* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
H
Haojun Liao 已提交
73
  assert(readHandle != NULL && pSubplan != NULL);
H
Haojun Liao 已提交
74
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
H
Haojun Liao 已提交
75

H
Haojun Liao 已提交
76
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId);
H
Haojun Liao 已提交
77 78 79
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
80

D
dapan1121 已提交
81
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
82
  code = dsDataSinkMgtInit(&cfg);
H
Haojun Liao 已提交
83
  if (code != TSDB_CODE_SUCCESS) {
84
    goto _error;
H
Haojun Liao 已提交
85
  }
L
Liu Jicong 已提交
86 87 88
  if (handle) {
    code = dsCreateDataSinker(pSubplan->pDataSink, handle);
  }
89

H
Haojun Liao 已提交
90
  _error:
H
Haojun Liao 已提交
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
  if(pQInfo->sql) {
    int ms = 0;
    char* pcnt = strstr(pQInfo->sql, " count(*)");
    if(pcnt) return 0;
    
    char* pos = strstr(pQInfo->sql, " t_");
    if(pos){
      pos += 3;
      ms = atoi(pos);
      while(*pos >= '0' && *pos <= '9'){
        pos ++;
      }
      char unit_char = *pos;
      if(unit_char == 'h'){
        ms *= 3600*1000;
      } else if(unit_char == 'm'){
        ms *= 60*1000;
      } else if(unit_char == 's'){
        ms *= 1000;
      }
    }
    if(ms == 0) return 0;
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
    
    if(ms < 1000) {
      taosMsleep(ms);
    } else {
      int used_ms = 0;
      while(used_ms < ms) {
        taosMsleep(1000);
        used_ms += 1000;
129
        if(isTaskKilled(pQInfo)){
H
Haojun Liao 已提交
130 131 132 133 134 135 136 137 138 139
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

D
dapan1121 已提交
140
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
H
Haojun Liao 已提交
141 142
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();
H
Haojun Liao 已提交
143

D
dapan1121 已提交
144
  *pRes = NULL;
H
Haojun Liao 已提交
145
  int64_t curOwner = 0;
146
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
H
Haojun Liao 已提交
147
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
H
Haojun Liao 已提交
148
           (void*)curOwner);
149
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
D
dapan1121 已提交
150
    return pTaskInfo->code;
H
Haojun Liao 已提交
151 152
  }

H
Haojun Liao 已提交
153
  if (pTaskInfo->cost.start == 0) {
154
    pTaskInfo->cost.start = taosGetTimestampMs();
H
Haojun Liao 已提交
155 156
  }

157
  if (isTaskKilled(pTaskInfo)) {
H
Haojun Liao 已提交
158
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
D
dapan1121 已提交
159
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
160 161 162
  }

  // error occurs, record the error code and return to client
163
  int32_t ret = setjmp(pTaskInfo->env);
H
Haojun Liao 已提交
164
  if (ret != TSDB_CODE_SUCCESS) {
165 166
    publishQueryAbortEvent(pTaskInfo, ret);
    pTaskInfo->code = ret;
H
Haojun Liao 已提交
167
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
168
           tstrerror(pTaskInfo->code));
D
dapan1121 已提交
169
    return pTaskInfo->code;
H
Haojun Liao 已提交
170 171
  }

H
Haojun Liao 已提交
172
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
173 174

  bool newgroup = false;
175
  publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
H
Haojun Liao 已提交
176
  int64_t st = 0;
H
Haojun Liao 已提交
177

178
  st = taosGetTimestampUs();
D
dapan1121 已提交
179
  *pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
H
Haojun Liao 已提交
180

H
Haojun Liao 已提交
181 182 183
  uint64_t el = (taosGetTimestampUs() - st);
  pTaskInfo->cost.elapsedTime += el;

184
  publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
H
Haojun Liao 已提交
185

D
dapan1121 已提交
186 187 188 189
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

H
Haojun Liao 已提交
190 191 192
  int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
  pTaskInfo->totalRows += current;

H
Haojun Liao 已提交
193 194
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
H
Haojun Liao 已提交
195

196
  atomic_store_64(&pTaskInfo->owner, 0);
D
dapan1121 已提交
197
  return pTaskInfo->code;
H
Haojun Liao 已提交
198 199 200 201 202 203 204 205 206 207
}

void* qGetResultRetrieveMsg(qTaskInfo_t qinfo) {
  SQInfo* pQInfo = (SQInfo*) qinfo;
  assert(pQInfo != NULL);

  return pQInfo->rspContext;
}

int32_t qKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
208
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
209

H
Haojun Liao 已提交
210
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
211 212 213
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
214
  qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
215
  setTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
216 217 218

  // Wait for the query executing thread being stopped/
  // Once the query is stopped, the owner of qHandle will be cleared immediately.
H
Haojun Liao 已提交
219
  while (pTaskInfo->owner != 0) {
H
Haojun Liao 已提交
220 221 222 223 224 225
    taosMsleep(100);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
226
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
H
Haojun Liao 已提交
227
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
D
dapan1121 已提交
228

H
Haojun Liao 已提交
229
  if (pTaskInfo == NULL) {
D
dapan1121 已提交
230 231 232
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
233
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
234
  setTaskKilled(pTaskInfo);
D
dapan1121 已提交
235 236 237 238

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
239
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
240
  SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
H
Haojun Liao 已提交
241

242
  if (pTaskInfo == NULL /*|| !isValidQInfo(pTaskInfo)*/) {
H
Haojun Liao 已提交
243 244 245
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

246
  return isTaskKilled(pTaskInfo) || Q_STATUS_EQUAL(pTaskInfo->status, TASK_OVER);
H
Haojun Liao 已提交
247 248
}

H
Haojun Liao 已提交
249 250
void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
H
Haojun Liao 已提交
251
  qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows);
H
Haojun Liao 已提交
252

H
Haojun Liao 已提交
253 254
  queryCostStatis(pTaskInfo);   // print the query cost summary
  doDestroyTask(pTaskInfo);
H
Haojun Liao 已提交
255 256 257 258 259 260 261 262 263 264 265 266 267
}

#if 0
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) {
  int32_t error = TSDB_CODE_SUCCESS;
  void** handle = qAcquireTask(pMgmt, qId);
  if(handle == NULL) return terrno;

  SQInfo* pQInfo = (SQInfo*)(*handle);
  if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }
H
Haojun Liao 已提交
268
  qWarn("%s be killed(no memory commit).", pQInfo->qId);
H
Haojun Liao 已提交
269
  setTaskKilled(pQInfo);
H
Haojun Liao 已提交
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284

  // wait query stop
  int32_t loop = 0;
  while (pQInfo->owner != 0) {
    taosMsleep(waitMs);
    if(loop++ > waitCount){
      error = TSDB_CODE_FAILED;
      break;
    }
  }

  qReleaseTask(pMgmt, (void **)&handle, true);
  return error;
}

L
Liu Jicong 已提交
285
#endif