querytask.h 3.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef TDENGINE_QUERYTASK_H
#define TDENGINE_QUERYTASK_H

#ifdef __cplusplus
extern "C" {
#endif

#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)

typedef struct STaskIdInfo {
  uint64_t queryId;  // this is also a request id
  uint64_t subplanId;
  uint64_t templateId;
  char*    str;
  int32_t  vgId;
} STaskIdInfo;

typedef struct STaskCostInfo {
  int64_t                 created;
  int64_t                 start;
  uint64_t                elapsedTime;
  double                  extractListTime;
  double                  groupIdMapTime;
  SFileBlockLoadRecorder* pRecoder;
} STaskCostInfo;

typedef struct STaskStopInfo {
  SRWLatch lock;
  SArray*  pStopInfo;
} STaskStopInfo;

struct SExecTaskInfo {
  STaskIdInfo           id;
  uint32_t              status;
  STimeWindow           window;
  STaskCostInfo         cost;
  int64_t               owner;  // if it is in execution
  int32_t               code;
  int32_t               qbufQuota;  // total available buffer (in KB) during execution query
  int64_t               version;    // used for stream to record wal version, why not move to sschemainfo
  SStreamTaskInfo       streamInfo;
  SSchemaInfo           schemaInfo;
  const char*           sql;        // query sql string
  jmp_buf               env;        // jump to this position when error happens.
  EOPTR_EXEC_MODEL      execModel;  // operator execution model [batch model|stream model]
  SSubplan*             pSubplan;
  struct SOperatorInfo* pRoot;
  SLocalFetch           localFetch;
  SArray*               pResultBlockList;  // result block list
  STaskStopInfo         stopInfo;
  SRWLatch              lock;  // secure the access of STableListInfo
};

void           buildTaskId(uint64_t taskId, uint64_t queryId, char* dst);
H
Haojun Liao 已提交
70
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model);
71 72 73 74 75 76 77 78 79 80 81 82 83
void           doDestroyTask(SExecTaskInfo* pTaskInfo);
bool           isTaskKilled(SExecTaskInfo* pTaskInfo);
void           setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void           setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t        createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
                                  int32_t vgId, char* sql, EOPTR_EXEC_MODEL model);
int32_t        qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);

#ifdef __cplusplus
}
#endif

#endif  // TDENGINE_QUERYTASK_H