clientInt.h 7.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef TDENGINE_CLIENTINT_H
#define TDENGINE_CLIENTINT_H

#ifdef __cplusplus
extern "C" {
#endif

#include "taos.h"
H
Haojun Liao 已提交
24
#include "common.h"
H
Hongze Cheng 已提交
25
#include "tmsg.h"
26 27
#include "tdef.h"
#include "tep.h"
28 29
#include "thash.h"
#include "tlist.h"
30
#include "tmsgtype.h"
31
#include "trpc.h"
H
Haojun Liao 已提交
32
#include "query.h"
33

L
Liu Jicong 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
#define HEARTBEAT_INTERVAL 1500  // ms

typedef struct SAppInstInfo SAppInstInfo;

typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);

typedef struct SAppHbMgr {
  // statistics
  int32_t reportCnt;
  int32_t connKeyCnt;
  int64_t reportBytes;  // not implemented
  int64_t startTime;
  // ctl
  SRWLatch lock;  // lock is used in serialization
  // connection
  SAppInstInfo* pAppInstInfo;
  // info
  SHashObj* activeInfo;    // hash<SClientHbKey, SClientHbReq>
  SHashObj* getInfoFuncs;  // hash<SClientHbKey, FGetConnInfo>
} SAppHbMgr;

typedef struct SClientHbMgr {
  int8_t inited;
  // ctl
  int8_t          threadStop;
  pthread_t       thread;
  pthread_mutex_t lock;       // used when app init and cleanup
  SArray*         appHbMgrs;  // SArray<SAppHbMgr*> one for each cluster
  FHbRspHandle    handle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr;

// TODO: embed param into function
// return type: SArray<Skv>
typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param);

69
typedef struct SQueryExecMetric {
70 71 72 73
  int64_t      start;    // start timestamp
  int64_t      parsed;   // start to parse
  int64_t      send;     // start to send to server
  int64_t      rsp;      // receive response from server
74 75
} SQueryExecMetric;

76
typedef struct SInstanceSummary {
77 78 79 80 81 82 83 84 85 86
  uint64_t     numOfInsertsReq;
  uint64_t     numOfInsertRows;
  uint64_t     insertElapsedTime;
  uint64_t     insertBytes;         // submit to tsdb since launched.

  uint64_t     fetchBytes;
  uint64_t     queryElapsedTime;
  uint64_t     numOfSlowQueries;
  uint64_t     totalRequests;
  uint64_t     currentRequests;      // the number of SRequestObj
87
} SInstanceSummary;
88 89 90 91 92

typedef struct SHeartBeatInfo {
  void  *pTimer;   // timer, used to send request msg to mnode
} SHeartBeatInfo;

L
Liu Jicong 已提交
93 94 95 96
struct SAppInstInfo {
  int64_t          numOfConns;
  SCorEpSet        mgmtEp;
  SInstanceSummary summary;
97
  SList            *pConnList;  // STscObj linked list
L
Liu Jicong 已提交
98
  int64_t          clusterId;
99
  void             *pTransporter;
L
Liu Jicong 已提交
100 101
  struct SAppHbMgr *pAppHbMgr;
};
102 103 104

typedef struct SAppInfo {
  int64_t        startTime;
105
  char           appName[TSDB_APP_NAME_LEN];
106 107 108
  char          *ep;
  int32_t        pid;
  int32_t        numOfThreads;
H
Haojun Liao 已提交
109

110 111 112 113
  SHashObj      *pInstMap;
} SAppInfo;

typedef struct STscObj {
114 115 116 117 118
  char             user[TSDB_USER_LEN];
  char             pass[TSDB_PASSWORD_LEN];
  char             db[TSDB_DB_FNAME_LEN];
  int32_t          acctId;
  uint32_t         connId;
L
Liu Jicong 已提交
119
  int32_t          connType;
120 121
  uint64_t         id;       // ref ID returned by taosAddRef
  pthread_mutex_t  mutex;     // used to protect the operation on db
H
Haojun Liao 已提交
122
  int32_t          numOfReqs; // number of sqlObj bound to this connection
123
  SAppInstInfo    *pAppInfo;
124 125
} STscObj;

L
Liu Jicong 已提交
126 127 128 129
typedef struct SMqConsumer {
  STscObj* pTscObj;
} SMqConsumer;

130 131
typedef struct SReqResultInfo {
  const char  *pRspMsg;
H
Haojun Liao 已提交
132 133
  const char  *pData;
  TAOS_FIELD  *fields;
134
  uint32_t     numOfCols;
H
Haojun Liao 已提交
135 136 137
  int32_t     *length;
  TAOS_ROW     row;
  char       **pCol;
138
  uint32_t     numOfRows;
139
  uint64_t     totalRows;
140
  uint32_t     current;
141
  bool         completed;
142 143
} SReqResultInfo;

144 145 146 147 148 149 150
typedef struct SShowReqInfo {
  int64_t         execId;        // showId/queryId
  int32_t         vgId;
  SArray         *pArray;        // SArray<SVgroupInfo>
  int32_t         currentIndex;  // current accessed vgroup index.
} SShowReqInfo;

151
typedef struct SRequestSendRecvBody {
H
Haojun Liao 已提交
152 153 154 155 156 157 158
  tsem_t            rspSem;        // not used now
  void*             fp;
  SShowReqInfo      showInfo;      // todo this attribute will be removed after the query framework being completed.
  SDataBuf          requestMsg;
  struct SSchJob   *pQueryJob;     // query job, created according to sql query DAG.
  struct SQueryDag *pDag;          // the query dag, generated according to the sql statement.
  SReqResultInfo    resInfo;
159
} SRequestSendRecvBody;
160

161 162
#define ERROR_MSG_BUF_DEFAULT_SIZE  512

163 164 165 166 167
typedef struct SRequestObj {
  uint64_t         requestId;
  int32_t          type;   // request type
  STscObj         *pTscObj;
  char            *sqlstr;  // sql string
X
Xiaoyu Wang 已提交
168
  int32_t          sqlLen;
169 170 171
  int64_t          self;
  char            *msgBuf;
  void            *pInfo;   // sql parse info, generated by parser module
172
  int32_t          code;
H
Haojun Liao 已提交
173 174
  SQueryExecMetric metric;
  SRequestSendRecvBody body;
175 176
} SRequestObj;

177
extern SAppInfo appInfo;
H
Haojun Liao 已提交
178
extern int32_t  clientReqRefPool;
179
extern int32_t  clientConnRefPool;
180

H
Haojun Liao 已提交
181
extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
H
Haojun Liao 已提交
182
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
H
Haojun Liao 已提交
183
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
184 185 186

int   taos_init();

187
void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo);
188
void  destroyTscObj(void*pObj);
189

190 191
uint64_t generateRequestId();

192 193 194 195 196
void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void  destroyRequest(SRequestObj* pRequest);

char *getConnectionDB(STscObj* pObj);
void  setConnectionDB(STscObj* pTscObj, const char* db);
197 198

void taos_init_imp(void);
H
Haojun Liao 已提交
199
int  taos_options_imp(TSDB_OPTION option, const char *str);
200

H
Haojun Liao 已提交
201
void* openTransporter(const char *user, const char *auth, int32_t numOfThreads);
202 203

void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
204

205 206
void initMsgHandleFp();

207 208
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);

H
Haojun Liao 已提交
209
void *doFetchRow(SRequestObj* pRequest);
210

H
Haojun Liao 已提交
211
void  setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
212

L
Liu Jicong 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
// --- heartbeat 
// global, called by mgmt
int  hbMgrInit();
void hbMgrCleanUp();
int  hbHandleRsp(SClientHbBatchRsp* hbRsp);

// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo);
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr);

// conn level
int  hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);

int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);

// --- mq
void hbMgrInitMqHbRspHandle();


233 234 235 236 237
#ifdef __cplusplus
}
#endif

#endif  // TDENGINE_CLIENTINT_H