clientInt.h 6.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef TDENGINE_CLIENTINT_H
#define TDENGINE_CLIENTINT_H

#ifdef __cplusplus
extern "C" {
#endif

#include "taos.h"
H
Haojun Liao 已提交
24
#include "common.h"
H
Hongze Cheng 已提交
25
#include "tmsg.h"
26 27
#include "tdef.h"
#include "tep.h"
28 29
#include "thash.h"
#include "tlist.h"
30
#include "tmsgtype.h"
31
#include "trpc.h"
H
Haojun Liao 已提交
32
#include "query.h"
33

L
Liu Jicong 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
#define HEARTBEAT_INTERVAL 1500  // ms

typedef struct SAppInstInfo SAppInstInfo;

typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);

typedef struct SAppHbMgr {
  // statistics
  int32_t reportCnt;
  int32_t connKeyCnt;
  int64_t reportBytes;  // not implemented
  int64_t startTime;
  // ctl
  SRWLatch lock;  // lock is used in serialization
  // connection
  SAppInstInfo* pAppInstInfo;
  // info
  SHashObj* activeInfo;    // hash<SClientHbKey, SClientHbReq>
  SHashObj* getInfoFuncs;  // hash<SClientHbKey, FGetConnInfo>
} SAppHbMgr;

typedef struct SClientHbMgr {
  int8_t inited;
  // ctl
  int8_t          threadStop;
  pthread_t       thread;
  pthread_mutex_t lock;       // used when app init and cleanup
  SArray*         appHbMgrs;  // SArray<SAppHbMgr*> one for each cluster
  FHbRspHandle    handle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr;

// TODO: embed param into function
// return type: SArray<Skv>
typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param);

69
typedef struct SQueryExecMetric {
70 71 72 73
  int64_t      start;    // start timestamp
  int64_t      parsed;   // start to parse
  int64_t      send;     // start to send to server
  int64_t      rsp;      // receive response from server
74 75
} SQueryExecMetric;

76
typedef struct SInstanceSummary {
77 78 79 80 81 82 83 84 85 86
  uint64_t     numOfInsertsReq;
  uint64_t     numOfInsertRows;
  uint64_t     insertElapsedTime;
  uint64_t     insertBytes;         // submit to tsdb since launched.

  uint64_t     fetchBytes;
  uint64_t     queryElapsedTime;
  uint64_t     numOfSlowQueries;
  uint64_t     totalRequests;
  uint64_t     currentRequests;      // the number of SRequestObj
87
} SInstanceSummary;
88 89 90 91 92

typedef struct SHeartBeatInfo {
  void  *pTimer;   // timer, used to send request msg to mnode
} SHeartBeatInfo;

L
Liu Jicong 已提交
93 94 95 96
struct SAppInstInfo {
  int64_t          numOfConns;
  SCorEpSet        mgmtEp;
  SInstanceSummary summary;
97
  SList            *pConnList;  // STscObj linked list
L
Liu Jicong 已提交
98
  int64_t          clusterId;
99
  void             *pTransporter;
L
Liu Jicong 已提交
100 101
  struct SAppHbMgr *pAppHbMgr;
};
102 103 104

typedef struct SAppInfo {
  int64_t        startTime;
105
  char           appName[TSDB_APP_NAME_LEN];
106 107 108
  char          *ep;
  int32_t        pid;
  int32_t        numOfThreads;
H
Haojun Liao 已提交
109

110 111 112 113
  SHashObj      *pInstMap;
} SAppInfo;

typedef struct STscObj {
114 115 116 117 118
  char             user[TSDB_USER_LEN];
  char             pass[TSDB_PASSWORD_LEN];
  char             db[TSDB_DB_FNAME_LEN];
  int32_t          acctId;
  uint32_t         connId;
L
Liu Jicong 已提交
119
  int32_t          connType;
120 121
  uint64_t         id;       // ref ID returned by taosAddRef
  pthread_mutex_t  mutex;     // used to protect the operation on db
H
Haojun Liao 已提交
122
  int32_t          numOfReqs; // number of sqlObj bound to this connection
123
  SAppInstInfo    *pAppInfo;
124 125
} STscObj;

L
Liu Jicong 已提交
126 127 128 129
typedef struct SMqConsumer {
  STscObj* pTscObj;
} SMqConsumer;

130 131
typedef struct SReqResultInfo {
  const char  *pRspMsg;
H
Haojun Liao 已提交
132 133
  const char  *pData;
  TAOS_FIELD  *fields;
134
  uint32_t     numOfCols;
H
Haojun Liao 已提交
135 136 137
  int32_t     *length;
  TAOS_ROW     row;
  char       **pCol;
138 139
  uint32_t     numOfRows;
  uint32_t     current;
140
  bool         completed;
141 142
} SReqResultInfo;

143 144 145 146 147 148 149
typedef struct SShowReqInfo {
  int64_t         execId;        // showId/queryId
  int32_t         vgId;
  SArray         *pArray;        // SArray<SVgroupInfo>
  int32_t         currentIndex;  // current accessed vgroup index.
} SShowReqInfo;

150
typedef struct SRequestSendRecvBody {
H
Haojun Liao 已提交
151 152 153 154 155 156 157
  tsem_t            rspSem;        // not used now
  void*             fp;
  SShowReqInfo      showInfo;      // todo this attribute will be removed after the query framework being completed.
  SDataBuf          requestMsg;
  struct SSchJob   *pQueryJob;     // query job, created according to sql query DAG.
  struct SQueryDag *pDag;          // the query dag, generated according to the sql statement.
  SReqResultInfo    resInfo;
158
} SRequestSendRecvBody;
159

160 161
#define ERROR_MSG_BUF_DEFAULT_SIZE  512

162 163 164 165 166
typedef struct SRequestObj {
  uint64_t         requestId;
  int32_t          type;   // request type
  STscObj         *pTscObj;
  char            *sqlstr;  // sql string
X
Xiaoyu Wang 已提交
167
  int32_t          sqlLen;
168 169 170
  int64_t          self;
  char            *msgBuf;
  void            *pInfo;   // sql parse info, generated by parser module
171
  int32_t          code;
H
Haojun Liao 已提交
172 173
  SQueryExecMetric metric;
  SRequestSendRecvBody body;
174 175
} SRequestObj;

176
extern SAppInfo appInfo;
H
Haojun Liao 已提交
177
extern int32_t  clientReqRefPool;
178
extern int32_t  clientConnRefPool;
179

H
Haojun Liao 已提交
180
extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
H
Haojun Liao 已提交
181
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
182
SMsgSendInfo* buildMsgInfoImpl(SRequestObj*);
183 184 185

int   taos_init();

186
void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo);
187
void  destroyTscObj(void*pObj);
188

189 190
uint64_t generateRequestId();

191 192 193 194 195
void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void  destroyRequest(SRequestObj* pRequest);

char *getConnectionDB(STscObj* pObj);
void  setConnectionDB(STscObj* pTscObj, const char* db);
196 197

void taos_init_imp(void);
H
Haojun Liao 已提交
198
int  taos_options_imp(TSDB_OPTION option, const char *str);
199

H
Haojun Liao 已提交
200
void* openTransporter(const char *user, const char *auth, int32_t numOfThreads);
201 202

void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
203

204 205
void initMsgHandleFp();

206 207
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);

H
Haojun Liao 已提交
208
void *doFetchRow(SRequestObj* pRequest);
209

H
Haojun Liao 已提交
210
void  setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
211

L
Liu Jicong 已提交
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
// --- heartbeat 
// global, called by mgmt
int  hbMgrInit();
void hbMgrCleanUp();
int  hbHandleRsp(SClientHbBatchRsp* hbRsp);

// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo);
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr);

// conn level
int  hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);

int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);

// --- mq
void hbMgrInitMqHbRspHandle();


232 233 234 235 236
#ifdef __cplusplus
}
#endif

#endif  // TDENGINE_CLIENTINT_H