clientInt.h 6.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef TDENGINE_CLIENTINT_H
#define TDENGINE_CLIENTINT_H

#ifdef __cplusplus
extern "C" {
#endif

#include "taos.h"
H
Haojun Liao 已提交
24
#include "common.h"
H
Hongze Cheng 已提交
25
#include "tmsg.h"
26 27
#include "tdef.h"
#include "tep.h"
28 29
#include "thash.h"
#include "tlist.h"
30
#include "tmsgtype.h"
31
#include "trpc.h"
H
Haojun Liao 已提交
32
#include "query.h"
33

L
Liu Jicong 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
#define HEARTBEAT_INTERVAL 1500  // ms

typedef struct SAppInstInfo SAppInstInfo;

typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);

typedef struct SAppHbMgr {
  // statistics
  int32_t reportCnt;
  int32_t connKeyCnt;
  int64_t reportBytes;  // not implemented
  int64_t startTime;
  // ctl
  SRWLatch lock;  // lock is used in serialization
  // connection
  SAppInstInfo* pAppInstInfo;
  // info
  SHashObj* activeInfo;    // hash<SClientHbKey, SClientHbReq>
  SHashObj* getInfoFuncs;  // hash<SClientHbKey, FGetConnInfo>
} SAppHbMgr;

typedef struct SClientHbMgr {
  int8_t inited;
  // ctl
  int8_t          threadStop;
  pthread_t       thread;
  pthread_mutex_t lock;       // used when app init and cleanup
  SArray*         appHbMgrs;  // SArray<SAppHbMgr*> one for each cluster
  FHbRspHandle    handle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr;

// TODO: embed param into function
// return type: SArray<Skv>
typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param);

69
typedef struct SQueryExecMetric {
70 71 72 73
  int64_t      start;    // start timestamp
  int64_t      parsed;   // start to parse
  int64_t      send;     // start to send to server
  int64_t      rsp;      // receive response from server
74 75
} SQueryExecMetric;

76
typedef struct SInstanceSummary {
77 78 79 80 81 82 83 84 85 86
  uint64_t     numOfInsertsReq;
  uint64_t     numOfInsertRows;
  uint64_t     insertElapsedTime;
  uint64_t     insertBytes;         // submit to tsdb since launched.

  uint64_t     fetchBytes;
  uint64_t     queryElapsedTime;
  uint64_t     numOfSlowQueries;
  uint64_t     totalRequests;
  uint64_t     currentRequests;      // the number of SRequestObj
87
} SInstanceSummary;
88 89 90 91 92

typedef struct SHeartBeatInfo {
  void  *pTimer;   // timer, used to send request msg to mnode
} SHeartBeatInfo;

L
Liu Jicong 已提交
93 94 95 96
struct SAppInstInfo {
  int64_t          numOfConns;
  SCorEpSet        mgmtEp;
  SInstanceSummary summary;
97
  SList            *pConnList;  // STscObj linked list
L
Liu Jicong 已提交
98
  int64_t          clusterId;
99
  void             *pTransporter;
L
Liu Jicong 已提交
100 101
  struct SAppHbMgr *pAppHbMgr;
};
102 103 104

typedef struct SAppInfo {
  int64_t        startTime;
105
  char           appName[TSDB_APP_NAME_LEN];
106 107 108
  char          *ep;
  int32_t        pid;
  int32_t        numOfThreads;
H
Haojun Liao 已提交
109

110 111 112 113
  SHashObj      *pInstMap;
} SAppInfo;

typedef struct STscObj {
114 115 116 117 118
  char             user[TSDB_USER_LEN];
  char             pass[TSDB_PASSWORD_LEN];
  char             db[TSDB_DB_FNAME_LEN];
  int32_t          acctId;
  uint32_t         connId;
L
Liu Jicong 已提交
119
  int32_t          connType;
120 121 122 123 124
  uint64_t         id;       // ref ID returned by taosAddRef
  void            *pTransporter;
  pthread_mutex_t  mutex;     // used to protect the operation on db
  int32_t          numOfReqs; // number of sqlObj from this tscObj
  SAppInstInfo    *pAppInfo;
125 126
} STscObj;

L
Liu Jicong 已提交
127 128 129 130
typedef struct SMqConsumer {
  STscObj* pTscObj;
} SMqConsumer;

131 132
typedef struct SReqResultInfo {
  const char  *pRspMsg;
H
Haojun Liao 已提交
133 134
  const char  *pData;
  TAOS_FIELD  *fields;
135
  uint32_t     numOfCols;
H
Haojun Liao 已提交
136 137 138
  int32_t     *length;
  TAOS_ROW     row;
  char       **pCol;
139 140
  uint32_t     numOfRows;
  uint32_t     current;
141
  bool         completed;
142 143
} SReqResultInfo;

144 145 146 147 148 149 150
typedef struct SShowReqInfo {
  int64_t         execId;        // showId/queryId
  int32_t         vgId;
  SArray         *pArray;        // SArray<SVgroupInfo>
  int32_t         currentIndex;  // current accessed vgroup index.
} SShowReqInfo;

151 152 153
typedef struct SRequestSendRecvBody {
  tsem_t          rspSem;        // not used now
  void*           fp;
154
  SShowReqInfo    showInfo;      // todo this attribute will be removed after the query framework being completed.
155
  struct SSchJob *pQueryJob;     // query job, created according to sql query DAG.
156
  SDataBuf        requestMsg;
157 158
  SReqResultInfo  resInfo;
} SRequestSendRecvBody;
159

160 161
#define ERROR_MSG_BUF_DEFAULT_SIZE  512

162 163 164 165 166
typedef struct SRequestObj {
  uint64_t         requestId;
  int32_t          type;   // request type
  STscObj         *pTscObj;
  char            *sqlstr;  // sql string
X
Xiaoyu Wang 已提交
167
  int32_t          sqlLen;
168 169 170
  int64_t          self;
  char            *msgBuf;
  void            *pInfo;   // sql parse info, generated by parser module
171
  int32_t          code;
H
Haojun Liao 已提交
172 173
  SQueryExecMetric metric;
  SRequestSendRecvBody body;
174 175
} SRequestObj;

176
extern SAppInfo appInfo;
H
Haojun Liao 已提交
177
extern int32_t  clientReqRefPool;
178
extern int32_t  clientConnRefPool;
179

H
Haojun Liao 已提交
180
extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
H
Haojun Liao 已提交
181
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
182
SMsgSendInfo* buildMsgInfoImpl(SRequestObj*);
183 184 185

int   taos_init();

186
void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo);
187
void  destroyTscObj(void*pObj);
188

189 190
uint64_t generateRequestId();

191 192 193 194 195
void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void  destroyRequest(SRequestObj* pRequest);

char *getConnectionDB(STscObj* pObj);
void  setConnectionDB(STscObj* pTscObj, const char* db);
196 197

void taos_init_imp(void);
H
Haojun Liao 已提交
198
int  taos_options_imp(TSDB_OPTION option, const char *str);
199

H
Haojun Liao 已提交
200
void* openTransporter(const char *user, const char *auth, int32_t numOfThreads);
201 202

void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
203

204 205
void initMsgHandleFp();

206 207
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);

H
Haojun Liao 已提交
208
void *doFetchRow(SRequestObj* pRequest);
209

H
Haojun Liao 已提交
210
void  setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
211

L
Liu Jicong 已提交
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
// --- heartbeat 
// global, called by mgmt
int  hbMgrInit();
void hbMgrCleanUp();
int  hbHandleRsp(SClientHbBatchRsp* hbRsp);

// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo);
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr);

// conn level
int  hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);

int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);

// --- mq
void hbMgrInitMqHbRspHandle();


232 233 234 235 236
#ifdef __cplusplus
}
#endif

#endif  // TDENGINE_CLIENTINT_H