executorMain.c 12.7 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "dataSinkMgt.h"
L
Liu Jicong 已提交
17
#include "os.h"
H
Haojun Liao 已提交
18
#include "tmsg.h"
L
Liu Jicong 已提交
19
#include "tref.h"
S
slzhou 已提交
20
#include "tudf.h"
H
Haojun Liao 已提交
21 22

#include "executor.h"
S
slzhou 已提交
23 24
#include "executorimpl.h"
#include "query.h"
25 26

static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
L
Liu Jicong 已提交
27
int32_t             exchangeObjRefPool = -1;
28

L
Liu Jicong 已提交
29
static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
30 31 32 33
static void cleanupRefPool() {
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
  taosCloseRef(ref);
}
H
Haojun Liao 已提交
34

35
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
36
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
L
Liu Jicong 已提交
37
  assert(pSubplan != NULL);
H
Haojun Liao 已提交
38
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
H
Haojun Liao 已提交
39

40
  taosThreadOnce(&initPoolOnce, initRefPool);
41
  atexit(cleanupRefPool);
42

43
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
H
Haojun Liao 已提交
44 45 46
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
47

D
dapan1121 已提交
48
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
49
  code = dsDataSinkMgtInit(&cfg);
H
Haojun Liao 已提交
50
  if (code != TSDB_CODE_SUCCESS) {
51
    goto _error;
H
Haojun Liao 已提交
52
  }
L
Liu Jicong 已提交
53

L
Liu Jicong 已提交
54
  if (handle) {
D
dapan1121 已提交
55
    void* pSinkParam = NULL;
D
dapan1121 已提交
56
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
D
dapan1121 已提交
57 58 59
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
L
Liu Jicong 已提交
60

D
dapan1121 已提交
61
    code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
L
Liu Jicong 已提交
62
  }
63

L
Liu Jicong 已提交
64
_error:
H
Haojun Liao 已提交
65 66 67 68 69 70
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
L
Liu Jicong 已提交
71 72 73
int waitMoment(SQInfo* pQInfo) {
  if (pQInfo->sql) {
    int   ms = 0;
H
Haojun Liao 已提交
74
    char* pcnt = strstr(pQInfo->sql, " count(*)");
L
Liu Jicong 已提交
75 76
    if (pcnt) return 0;

H
Haojun Liao 已提交
77
    char* pos = strstr(pQInfo->sql, " t_");
L
Liu Jicong 已提交
78
    if (pos) {
H
Haojun Liao 已提交
79 80
      pos += 3;
      ms = atoi(pos);
L
Liu Jicong 已提交
81 82
      while (*pos >= '0' && *pos <= '9') {
        pos++;
H
Haojun Liao 已提交
83 84
      }
      char unit_char = *pos;
L
Liu Jicong 已提交
85 86 87 88 89
      if (unit_char == 'h') {
        ms *= 3600 * 1000;
      } else if (unit_char == 'm') {
        ms *= 60 * 1000;
      } else if (unit_char == 's') {
H
Haojun Liao 已提交
90 91 92
        ms *= 1000;
      }
    }
L
Liu Jicong 已提交
93
    if (ms == 0) return 0;
H
Haojun Liao 已提交
94
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
L
Liu Jicong 已提交
95 96

    if (ms < 1000) {
H
Haojun Liao 已提交
97 98 99
      taosMsleep(ms);
    } else {
      int used_ms = 0;
L
Liu Jicong 已提交
100
      while (used_ms < ms) {
H
Haojun Liao 已提交
101 102
        taosMsleep(1000);
        used_ms += 1000;
L
Liu Jicong 已提交
103
        if (isTaskKilled(pQInfo)) {
H
Haojun Liao 已提交
104 105 106 107 108 109 110 111 112 113
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

L
Liu Jicong 已提交
114
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
H
Haojun Liao 已提交
115 116
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();
H
Haojun Liao 已提交
117

D
dapan1121 已提交
118
  *pRes = NULL;
H
Haojun Liao 已提交
119
  int64_t curOwner = 0;
120
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
L
Liu Jicong 已提交
121
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
122
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
D
dapan1121 已提交
123
    return pTaskInfo->code;
H
Haojun Liao 已提交
124 125
  }

H
Haojun Liao 已提交
126
  if (pTaskInfo->cost.start == 0) {
127
    pTaskInfo->cost.start = taosGetTimestampMs();
H
Haojun Liao 已提交
128 129
  }

130
  if (isTaskKilled(pTaskInfo)) {
H
Haojun Liao 已提交
131
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
D
dapan1121 已提交
132
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
133 134 135
  }

  // error occurs, record the error code and return to client
136
  int32_t ret = setjmp(pTaskInfo->env);
H
Haojun Liao 已提交
137
  if (ret != TSDB_CODE_SUCCESS) {
138
    pTaskInfo->code = ret;
139
    cleanUpUdfs();
140
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
D
dapan1121 已提交
141
    return pTaskInfo->code;
H
Haojun Liao 已提交
142 143
  }

H
Haojun Liao 已提交
144
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
145

146
  int64_t st = taosGetTimestampUs();
D
dapan1121 已提交
147 148

  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
H
Haojun Liao 已提交
149
  uint64_t el = (taosGetTimestampUs() - st);
150

H
Haojun Liao 已提交
151
  pTaskInfo->cost.elapsedTime += el;
D
dapan1121 已提交
152 153 154 155
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

156 157
  cleanUpUdfs();

L
Liu Jicong 已提交
158
  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
159
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
H
Haojun Liao 已提交
160

H
Haojun Liao 已提交
161
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
L
Liu Jicong 已提交
162
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
H
Haojun Liao 已提交
163

164
  atomic_store_64(&pTaskInfo->owner, 0);
D
dapan1121 已提交
165
  return pTaskInfo->code;
H
Haojun Liao 已提交
166 167 168
}

int32_t qKillTask(qTaskInfo_t qinfo) {
L
Liu Jicong 已提交
169
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
H
Haojun Liao 已提交
170

H
Haojun Liao 已提交
171
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
172 173 174
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
175
  qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
176
  setTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
177 178 179

  // Wait for the query executing thread being stopped/
  // Once the query is stopped, the owner of qHandle will be cleared immediately.
H
Haojun Liao 已提交
180
  while (pTaskInfo->owner != 0) {
H
Haojun Liao 已提交
181 182 183 184 185 186
    taosMsleep(100);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
187
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
L
Liu Jicong 已提交
188
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
D
dapan1121 已提交
189

H
Haojun Liao 已提交
190
  if (pTaskInfo == NULL) {
D
dapan1121 已提交
191 192 193
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
194
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
195
  setTaskKilled(pTaskInfo);
D
dapan1121 已提交
196 197 198 199

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
200
void qDestroyTask(qTaskInfo_t qTaskHandle) {
L
Liu Jicong 已提交
201
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
H
Haojun Liao 已提交
202 203 204 205
  if (pTaskInfo == NULL) {
    return;
  }

L
Liu Jicong 已提交
206
  qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
H
Haojun Liao 已提交
207

L
Liu Jicong 已提交
208
  queryCostStatis(pTaskInfo);  // print the query cost summary
H
Haojun Liao 已提交
209
  doDestroyTask(pTaskInfo);
H
Haojun Liao 已提交
210
}
D
dapan1121 已提交
211

L
Liu Jicong 已提交
212 213 214
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int32_t        capacity = 0;
D
dapan1121 已提交
215

L
Liu Jicong 已提交
216
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
D
dapan1121 已提交
217 218
}

219 220
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
221 222 223 224
  if (pTaskInfo->pRoot == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

C
Cary Xu 已提交
225 226 227 228 229 230 231
  int32_t nOptrWithVal = 0;
  int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
  if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) {
    taosMemoryFreeClear(*pOutput);
    *len = 0;
  }
  return code;
H
Haojun Liao 已提交
232 233
}

234
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
L
Liu Jicong 已提交
235
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
236

H
Haojun Liao 已提交
237 238 239 240 241 242 243
  if (pTaskInfo == NULL || pInput == NULL || len == 0) {
    return TSDB_CODE_INVALID_PARA;
  }

  return decodeOperator(pTaskInfo->pRoot, pInput, len);
}

L
Liu Jicong 已提交
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
    uint8_t type = pOperator->operatorType;
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      *scanner = pOperator->info;
      return 0;
    } else {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
  }
}

260
#if 0
L
Liu Jicong 已提交
261 262 263 264 265 266
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
  return 0;
}
267
#endif
L
Liu Jicong 已提交
268

L
Liu Jicong 已提交
269 270 271 272 273 274 275 276 277
int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  pTaskInfo->streamInfo.recoverStartVer = startVer;
  pTaskInfo->streamInfo.recoverEndVer = endVer;
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE;
  return 0;
}

L
Liu Jicong 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
void* qExtractReaderFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return (void*)pInfo->tqReader;
}

const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return pInfo->tqReader->pSchemaWrapper;
}

const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return &pInfo->offset;
}

L
Liu Jicong 已提交
293 294
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
L
Liu Jicong 已提交
295
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
L
Liu Jicong 已提交
296 297 298 299 300
  return pTaskInfo->streamInfo.metaBlk;
}

int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
L
Liu Jicong 已提交
301
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
L
Liu Jicong 已提交
302 303 304 305
  memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
  return 0;
}

L
Liu Jicong 已提交
306
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
L
Liu Jicong 已提交
307 308
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
L
Liu Jicong 已提交
309
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
L
Liu Jicong 已提交
310
  pTaskInfo->streamInfo.prepareStatus = *pOffset;
L
Liu Jicong 已提交
311
  if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
L
Liu Jicong 已提交
312 313
    while (1) {
      uint8_t type = pOperator->operatorType;
L
Liu Jicong 已提交
314
      pOperator->status = OP_OPENED;
L
Liu Jicong 已提交
315 316 317
      if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
        SStreamScanInfo* pInfo = pOperator->info;
        if (pOffset->type == TMQ_OFFSET__LOG) {
L
Liu Jicong 已提交
318 319 320 321 322 323 324 325 326
#if 0
          if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
              pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
            qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version,
                   pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
            ASSERT(0);
          }
#endif
          if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
L
Liu Jicong 已提交
327
            return -1;
L
Liu Jicong 已提交
328
          }
L
Liu Jicong 已提交
329
          ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
L
Liu Jicong 已提交
330 331 332 333 334 335 336 337 338 339
        } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
          /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
          int64_t uid = pOffset->uid;
          int64_t ts = pOffset->ts;

          if (uid == 0) {
            if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
              STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
              uid = pTableInfo->uid;
              ts = INT64_MIN;
L
Liu Jicong 已提交
340 341
            } else {
              return -1;
L
Liu Jicong 已提交
342 343
            }
          }
L
Liu Jicong 已提交
344 345 346 347 348 349 350 351 352 353
          /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
          /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
          int32_t         tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
          bool            found = false;
          for (int32_t i = 0; i < tableSz; i++) {
            STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
            if (pTableInfo->uid == uid) {
              found = true;
              pTableScanInfo->currentTable = i;
L
Liu Jicong 已提交
354
              break;
L
Liu Jicong 已提交
355
            }
L
Liu Jicong 已提交
356
          }
L
Liu Jicong 已提交
357

L
Liu Jicong 已提交
358 359
          // TODO after dropping table, table may be not found
          ASSERT(found);
L
Liu Jicong 已提交
360

L
Liu Jicong 已提交
361 362 363 364 365 366
          tsdbSetTableId(pTableScanInfo->dataReader, uid);
          int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
          pTableScanInfo->cond.twindows.skey = ts + 1;
          tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
          pTableScanInfo->cond.twindows.skey = oldSkey;
          pTableScanInfo->scanTimes = 0;
L
Liu Jicong 已提交
367

L
Liu Jicong 已提交
368 369 370
          qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
                 pTableScanInfo->currentTable, tableSz);
          /*}*/
L
Liu Jicong 已提交
371

L
Liu Jicong 已提交
372 373 374 375
        } else {
          ASSERT(0);
        }
        return 0;
L
Liu Jicong 已提交
376
      } else {
L
Liu Jicong 已提交
377 378
        ASSERT(pOperator->numOfDownstream == 1);
        pOperator = pOperator->pDownstream[0];
L
Liu Jicong 已提交
379
      }
L
Liu Jicong 已提交
380 381 382 383 384
    }
  }
  return 0;
}

L
Liu Jicong 已提交
385
#if 0
L
Liu Jicong 已提交
386
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
L
Liu Jicong 已提交
387
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
D
dapan1121 已提交
388

389
  if (uid == 0) {
L
Liu Jicong 已提交
390 391 392 393 394
    if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
      STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
      uid = pTableInfo->uid;
      ts = INT64_MIN;
    }
395 396
  }

L
Liu Jicong 已提交
397
  return doPrepareScan(pTaskInfo->pRoot, uid, ts);
398 399
}

L
Liu Jicong 已提交
400 401
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
402

L
Liu Jicong 已提交
403 404
  return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
}
L
Liu Jicong 已提交
405
#endif