executor.h 5.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_EXECUTOR_H_
#define _TD_EXECUTOR_H_

#ifdef __cplusplus
extern "C" {
#endif

D
dapan1121 已提交
23
#include "query.h"
L
Liu Jicong 已提交
24
#include "tcommon.h"
25
#include "tmsgcb.h"
D
dapan1121 已提交
26

H
Haojun Liao 已提交
27
typedef void* qTaskInfo_t;
H
Haojun Liao 已提交
28
typedef void* DataSinkHandle;
S
Shengliang 已提交
29
struct SRpcMsg;
H
Haojun Liao 已提交
30 31
struct SSubplan;

32
typedef struct {
L
Liu Jicong 已提交
33
  void*   tqReader;
34 35 36 37 38
  void*   meta;
  void*   config;
  void*   vnode;
  void*   mnd;
  SMsgCb* pMsgCb;
39
  int64_t version;
40 41
  bool    initMetaReader;
  bool    initTableReader;
L
Liu Jicong 已提交
42
  bool    initTqReader;
5
54liuyao 已提交
43
  int32_t numOfVgroups;
44 45

  void*   sContext;      // SSnapContext*
wmmhello's avatar
wmmhello 已提交
46

L
Liu Jicong 已提交
47
  void*   pStateBackend;
H
Haojun Liao 已提交
48
} SReadHandle;
49

L
Liu Jicong 已提交
50
// in queue mode, data streams are seperated by msg
51
typedef enum {
52
  OPTR_EXEC_MODEL_BATCH = 0x1,
53
  OPTR_EXEC_MODEL_STREAM = 0x2,
L
Liu Jicong 已提交
54
  OPTR_EXEC_MODEL_QUEUE = 0x3,
55 56
} EOPTR_EXEC_MODEL;

L
Liu Jicong 已提交
57
/**
L
Liu Jicong 已提交
58
 * Create the exec task for stream mode
L
Liu Jicong 已提交
59
 * @param pMsg
L
Liu Jicong 已提交
60
 * @param SReadHandle
L
Liu Jicong 已提交
61 62
 * @return
 */
L
Liu Jicong 已提交
63
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
64

L
Liu Jicong 已提交
65
/**
L
Liu Jicong 已提交
66 67 68
 * Create the exec task for queue mode
 * @param pMsg
 * @param SReadHandle
L
Liu Jicong 已提交
69 70
 * @return
 */
L
Liu Jicong 已提交
71
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema);
L
Liu Jicong 已提交
72

H
Haojun Liao 已提交
73 74 75 76 77 78 79 80
/**
 * Set multiple input data blocks for the stream scan.
 * @param tinfo
 * @param pBlocks
 * @param numOfInputBlock
 * @param type
 * @return
 */
L
Liu Jicong 已提交
81
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
H
Haojun Liao 已提交
82

83 84
/**
 * @brief Cleanup SSDataBlock for StreamScanInfo
85 86
 *
 * @param tinfo
87 88 89
 */
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo);

90 91 92 93 94 95 96 97
/**
 * Update the table id list, add or remove.
 *
 * @param tinfo
 * @param id
 * @param isAdd
 * @return
 */
98
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd);
99

L
Liu Jicong 已提交
100 101 102 103 104 105 106 107 108 109
/**
 * Create the exec task object according to task json
 * @param readHandle
 * @param vgId
 * @param pTaskInfoMsg
 * @param pTaskInfo
 * @param qId
 * @return
 */
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
D
dapan1121 已提交
110
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model);
H
Haojun Liao 已提交
111

112 113 114 115 116 117 118
/**
 *
 * @param tinfo
 * @param sversion
 * @param tversion
 * @return
 */
119
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
L
Liu Jicong 已提交
120
                                    int32_t* tversion);
121

H
Haojun Liao 已提交
122
/**
H
Haojun Liao 已提交
123
 * The main task execution function, including query on both table and multiple tables,
H
Haojun Liao 已提交
124 125
 * which are decided according to the tag or table name query conditions
 *
H
Haojun Liao 已提交
126 127
 * @param tinfo
 * @param handle
H
Haojun Liao 已提交
128 129
 * @return
 */
130 131
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds);
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
H
Haojun Liao 已提交
132 133 134

/**
 * kill the ongoing query and free the query handle and corresponding resources automatically
H
Haojun Liao 已提交
135
 * @param tinfo  qhandle
H
Haojun Liao 已提交
136 137
 * @return
 */
H
Haojun Liao 已提交
138
int32_t qKillTask(qTaskInfo_t tinfo);
H
Haojun Liao 已提交
139

D
dapan1121 已提交
140 141
/**
 * kill the ongoing query asynchronously
H
Haojun Liao 已提交
142
 * @param tinfo  qhandle
D
dapan1121 已提交
143 144
 * @return
 */
H
Haojun Liao 已提交
145
int32_t qAsyncKillTask(qTaskInfo_t tinfo);
D
dapan1121 已提交
146

H
Haojun Liao 已提交
147 148 149 150
/**
 * destroy query info structure
 * @param qHandle
 */
H
Haojun Liao 已提交
151
void qDestroyTask(qTaskInfo_t tinfo);
H
Haojun Liao 已提交
152 153 154 155 156 157

/**
 * Get the queried table uid
 * @param qHandle
 * @return
 */
H
Haojun Liao 已提交
158
int64_t qGetQueriedTableUid(qTaskInfo_t tinfo);
H
Haojun Liao 已提交
159 160 161 162 163 164 165 166 167

/**
 * Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
 *
 * @param iter  the table iterator to traverse all tables belongs to a super table, or an invert index
 * @return
 */
int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList);

D
dapan1121 已提交
168
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
S
Shengliang 已提交
169

170
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/);
D
dapan1121 已提交
171

172 173 174 175
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);

int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len);

176 177 178 179 180 181 182 183 184
/**
 * return the scan info, in the form of tuple of two items, including table uid and current timestamp
 * @param tinfo
 * @param uid
 * @param ts
 * @return
 */
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);

L
Liu Jicong 已提交
185 186
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);

187
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
L
Liu Jicong 已提交
188 189 190

int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);

wmmhello's avatar
wmmhello 已提交
191
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
L
Liu Jicong 已提交
192

wmmhello's avatar
wmmhello 已提交
193 194 195 196 197 198
int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo);

const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);

const char* qExtractTbnameFromTask(qTaskInfo_t tinfo);

L
Liu Jicong 已提交
199 200
void* qExtractReaderFromStreamScanner(void* scanner);

L
Liu Jicong 已提交
201 202
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);

L
Liu Jicong 已提交
203 204
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem);

L
Liu Jicong 已提交
205 206
int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer);

H
Haojun Liao 已提交
207 208
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key);

H
Hongze Cheng 已提交
209 210 211 212
#ifdef __cplusplus
}
#endif

213
#endif /*_TD_EXECUTOR_H_*/