tsclient.h 14.8 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef TDENGINE_TSCLIENT_H
#define TDENGINE_TSCLIENT_H

#ifdef __cplusplus
extern "C" {
#endif

S
slguan 已提交
23
#include "os.h"
24

H
hzcheng 已提交
25
#include "taos.h"
26
#include "taosdef.h"
H
hzcheng 已提交
27
#include "taosmsg.h"
H
hjxilinx 已提交
28
#include "tarray.h"
S
slguan 已提交
29
#include "tglobal.h"
30
#include "tsqlfunction.h"
H
hjxilinx 已提交
31 32
#include "tutil.h"

33
#include "qExecutor.h"
H
hjxilinx 已提交
34 35
#include "qsqlparser.h"
#include "qtsbuf.h"
H
Haojun Liao 已提交
36
#include "tcmdtype.h"
H
hzcheng 已提交
37 38 39

// forward declaration
struct SSqlInfo;
H
hjxilinx 已提交
40 41 42 43 44 45 46
struct SLocalReducer;

// data source from sql string or from file
enum {
  DATA_FROM_SQL_STRING = 1,
  DATA_FROM_DATA_FILE = 2,
};
H
hzcheng 已提交
47

H
hjxilinx 已提交
48
typedef struct STableComInfo {
H
hjxilinx 已提交
49 50 51
  uint8_t numOfTags;
  uint8_t precision;
  int16_t numOfColumns;
B
Bomin Zhang 已提交
52
  int32_t rowSize;
H
hjxilinx 已提交
53
} STableComInfo;
H
hjxilinx 已提交
54

dengyihao's avatar
bugfix  
dengyihao 已提交
55 56 57
typedef struct SCMCorVgroupInfo {
  int32_t version;
  int8_t inUse;
dengyihao's avatar
dengyihao 已提交
58 59
  int8_t  numOfEps;
  SEpAddr epAddr[TSDB_MAX_REPLICA];
dengyihao's avatar
bugfix  
dengyihao 已提交
60 61
} SCMCorVgroupInfo;

H
hjxilinx 已提交
62
typedef struct STableMeta {
H
Haojun Liao 已提交
63
  STableComInfo tableInfo;
H
hjxilinx 已提交
64 65
  uint8_t       tableType;
  int16_t       sversion;
66
  int16_t       tversion;
dengyihao's avatar
bugfix  
dengyihao 已提交
67 68
  SCMVgroupInfo  vgroupInfo;
  SCMCorVgroupInfo  corVgroupInfo;
H
hjxilinx 已提交
69 70 71
  int32_t       sid;       // the index of one table in a virtual node
  uint64_t      uid;       // unique id of a table
  SSchema       schema[];  // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
H
hjxilinx 已提交
72 73 74
} STableMeta;

typedef struct STableMetaInfo {
75
  STableMeta *  pTableMeta;      // table meta, cached in client side and acquired by name
H
hjxilinx 已提交
76
  SVgroupsInfo *vgroupList;
77 78
  SArray       *pVgroupTables;   // SArray<SVgroupTableInfo>
  
H
hjxilinx 已提交
79
  /*
H
hjxilinx 已提交
80 81
   * 1. keep the vgroup index during the multi-vnode super table projection query
   * 2. keep the vgroup index for multi-vnode insertion
H
hjxilinx 已提交
82
   */
H
hjxilinx 已提交
83
  int32_t vgroupIndex;
H
hjxilinx 已提交
84
  char    name[TSDB_TABLE_ID_LEN];        // (super) table name
85
  SArray* tagColList;                     // SArray<SColumn*>, involved tag columns
H
hjxilinx 已提交
86
} STableMetaInfo;
H
hzcheng 已提交
87

S
slguan 已提交
88 89
/* the structure for sql function in select clause */
typedef struct SSqlExpr {
H
hjxilinx 已提交
90 91 92 93 94 95
  char      aliasName[TSDB_COL_NAME_LEN];  // as aliasName
  SColIndex colInfo;
  int64_t   uid;            // refactor use the pointer
  int16_t   functionId;     // function id in aAgg array
  int16_t   resType;        // return value type
  int16_t   resBytes;       // length of return value
H
Haojun Liao 已提交
96
  int32_t   interBytes;     // inter result buffer size
H
hjxilinx 已提交
97 98 99
  int16_t   numOfParams;    // argument value of each function
  tVariant  param[3];       // parameters are not more than 3
  int32_t   offset;         // sub result column value of arithmetic expression.
H
hzcheng 已提交
100 101
} SSqlExpr;

H
hjxilinx 已提交
102 103 104 105 106
typedef struct SColumnIndex {
  int16_t tableIndex;
  int16_t columnIndex;
} SColumnIndex;

H
hjxilinx 已提交
107 108
typedef struct SFieldSupInfo {
  bool            visible;
H
hjxilinx 已提交
109
  SExprInfo      *pArithExprInfo;
H
hjxilinx 已提交
110
  SSqlExpr *      pSqlExpr;
H
hjxilinx 已提交
111
} SFieldSupInfo;
S
slguan 已提交
112

H
hjxilinx 已提交
113
typedef struct SFieldInfo {
H
hjxilinx 已提交
114 115 116
  int16_t numOfOutput;   // number of column in result
  SArray *pFields;       // SArray<TAOS_FIELD>
  SArray *pSupportInfo;  // SArray<SFieldSupInfo>
H
hzcheng 已提交
117 118
} SFieldInfo;

119
typedef struct SColumn {
S
slguan 已提交
120 121 122
  SColumnIndex       colIndex;
  int32_t            numOfFilters;
  SColumnFilterInfo *filterInfo;
123
} SColumn;
H
hzcheng 已提交
124

S
slguan 已提交
125 126
typedef struct SCond {
  uint64_t uid;
H
hjxilinx 已提交
127
  int32_t  len;  // length of tag query condition data
H
hjxilinx 已提交
128
  char *   cond;
S
slguan 已提交
129 130 131
} SCond;

typedef struct SJoinNode {
S
slguan 已提交
132
  char     tableId[TSDB_TABLE_ID_LEN];
S
slguan 已提交
133
  uint64_t uid;
H
Haojun Liao 已提交
134
  int16_t  tagColId;
S
slguan 已提交
135 136 137 138 139 140 141 142
} SJoinNode;

typedef struct SJoinInfo {
  bool      hasJoin;
  SJoinNode left;
  SJoinNode right;
} SJoinInfo;

H
hzcheng 已提交
143
typedef struct STagCond {
S
slguan 已提交
144 145 146 147 148 149 150 151 152 153
  // relation between tbname list and query condition, including : TK_AND or TK_OR
  int16_t relType;

  // tbname query condition, only support tbname query condition on one table
  SCond tbnameCond;

  // join condition, only support two tables join currently
  SJoinInfo joinInfo;

  // for different table, the query condition must be seperated
H
hjxilinx 已提交
154
  SArray *pCond;
H
hzcheng 已提交
155 156
} STagCond;

S
slguan 已提交
157
typedef struct SParamInfo {
H
hjxilinx 已提交
158 159 160 161
  int32_t  idx;
  char     type;
  uint8_t  timePrec;
  short    bytes;
S
slguan 已提交
162 163 164
  uint32_t offset;
} SParamInfo;

S
slguan 已提交
165
typedef struct STableDataBlocks {
H
Haojun Liao 已提交
166 167 168 169 170 171 172
  char     tableId[TSDB_TABLE_ID_LEN];
  int8_t   tsSource;     // where does the UNIX timestamp come from, server or client
  bool     ordered;      // if current rows are ordered or not
  int64_t  vgId;         // virtual group id
  int64_t  prevTS;       // previous timestamp, recorded to decide if the records array is ts ascending
  int32_t  numOfTables;  // number of tables in current submit block
  int32_t  rowSize;      // row size for current table
S
slguan 已提交
173
  uint32_t nAllocSize;
H
Haojun Liao 已提交
174
  uint32_t headerSize;   // header for table info (uid, tid, submit metadata)
S
slguan 已提交
175
  uint32_t size;
176

H
hjxilinx 已提交
177
  /*
H
Haojun Liao 已提交
178
   * the table meta of table, the table meta will be used during submit, keep a ref
H
hjxilinx 已提交
179 180
   * to avoid it to be removed from cache
   */
H
hjxilinx 已提交
181
  STableMeta *pTableMeta;
H
Haojun Liao 已提交
182
  char       *pData;
S
slguan 已提交
183 184

  // for parameter ('?') binding
H
hjxilinx 已提交
185 186 187
  uint32_t    numOfAllocedParams;
  uint32_t    numOfParams;
  SParamInfo *params;
S
slguan 已提交
188
} STableDataBlocks;
H
hzcheng 已提交
189

190 191 192 193 194
//typedef struct SDataBlockList {  // todo remove
//  uint32_t           nSize;
//  uint32_t           nAlloc;
//  STableDataBlocks **pData;
//} SDataBlockList;
H
hzcheng 已提交
195

196
typedef struct SQueryInfo {
H
Haojun Liao 已提交
197 198
  int16_t          command;       // the command may be different for each subclause, so keep it seperately.
  uint32_t         type;          // query/insert/import type
199 200 201 202 203
  char             slidingTimeUnit;
  STimeWindow      window;
  int64_t          intervalTime;  // aggregation time interval
  int64_t          slidingTime;   // sliding window in mseconds
  SSqlGroupbyExpr  groupbyExpr;   // group by tags info
H
Haojun Liao 已提交
204
  SArray *         colList;       // SArray<SColumn*>
205
  SFieldInfo       fieldsInfo;
H
Haojun Liao 已提交
206
  SArray *         exprList;      // SArray<SSqlExpr*>
207 208 209 210
  SLimitVal        limit;
  SLimitVal        slimit;
  STagCond         tagCond;
  SOrderVal        order;
H
Haojun Liao 已提交
211
  int16_t          fillType;      // final result fill type
212
  int16_t          numOfTables;
H
hjxilinx 已提交
213
  STableMetaInfo **pTableMetaInfo;
214
  struct STSBuf *  tsBuf;
H
Haojun Liao 已提交
215 216 217 218
  int64_t *        fillVal;       // default value for fill
  char *           msg;           // pointer to the pCmd->payload to keep error message temporarily
  int64_t          clauseLimit;   // limit for current sub clause
  int64_t          prjOffset;     // offset value in the original sql expression, only applied at client side
219 220
} SQueryInfo;

H
hzcheng 已提交
221
typedef struct {
222 223
  int     command;
  uint8_t msgType;
224
  bool    autoCreated;        // create table if it is not existed during retrieve table meta in mnode
H
hjxilinx 已提交
225

226 227 228 229
  union {
    int32_t count;
    int32_t numOfTablesInSubmit;
  };
H
hzcheng 已提交
230

231
  int32_t      insertType;
232 233 234
  int32_t      clauseIndex;   // index of multiple subclause query

  char *       curSql;       // current sql, resume position of sql after parsing paused
235
  int8_t       parseFinished;
236

237 238 239 240
  short        numOfCols;
  uint32_t     allocSize;
  char *       payload;
  int32_t      payloadLen;
241 242
  SQueryInfo **pQueryInfo;
  int32_t      numOfClause;
243 244
  int32_t      batchSize;    // for parameter ('?') binding and batch processing
  int32_t      numOfParams;
245 246 247 248

  int8_t       dataSourceType;     // load data from file or not
  int8_t       submitSchema;  // submit block is built with table schema
  SHashObj    *pTableList;   // referred table involved in sql
249
  SArray      *pDataBlocks;  // SArray<STableDataBlocks*> submit data blocks after parsing sql
H
hzcheng 已提交
250 251 252 253 254 255 256 257
} SSqlCmd;

typedef struct SResRec {
  int numOfRows;
  int numOfTotal;
} SResRec;

typedef struct {
H
hjxilinx 已提交
258 259
  int64_t               numOfRows;                  // num of results in current retrieved
  int64_t               numOfTotal;                 // num of total results
H
Haojun Liao 已提交
260
  int64_t               numOfClauseTotal;           // num of total result in current subclause
H
hjxilinx 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
  char *                pRsp;
  int32_t               rspType;
  int32_t               rspLen;
  uint64_t              qhandle;
  int64_t               uid;
  int64_t               useconds;
  int64_t               offset;  // offset value from vnode during projection query of stable
  int32_t               row;
  int16_t               numOfCols;
  int16_t               precision;
  bool                  completed;
  int32_t               code;
  int32_t               numOfGroups;
  SResRec *             pGroupRec;
  char *                data;
  void **               tsrow;
277
  int32_t*              length;  // length for each field for current row
H
hjxilinx 已提交
278 279
  char **               buffer;  // Buffer used to put multibytes encoded using unicode (wchar_t)
  SColumnIndex *        pColumnIndex;
280 281
  SArithmeticSupport*   pArithSup;   // support the arithmetic expression calculation on agg functions
  
H
hzcheng 已提交
282 283 284
  struct SLocalReducer *pLocalReducer;
} SSqlRes;

H
hjxilinx 已提交
285
typedef struct STscObj {
H
hjxilinx 已提交
286 287 288 289
  void *             signature;
  void *             pTimer;
  char               user[TSDB_USER_LEN];
  char               pass[TSDB_KEY_LEN];
B
Bomin Zhang 已提交
290
  char               acctId[TSDB_ACCT_LEN];
B
Bomin Zhang 已提交
291
  char               db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
H
hjxilinx 已提交
292 293 294
  char               sversion[TSDB_VERSION_LEN];
  char               writeAuth : 1;
  char               superAuth : 1;
S
Shengliang Guan 已提交
295
  uint32_t           connId;
H
hjxilinx 已提交
296 297
  struct SSqlObj *   pHb;
  struct SSqlObj *   sqlList;
H
hjxilinx 已提交
298
  struct SSqlStream *streamList;
299
  void*              pDnodeConn;
H
hjxilinx 已提交
300
  pthread_mutex_t    mutex;
H
hzcheng 已提交
301 302
} STscObj;

303
typedef struct SSqlObj {
304 305
  void            *signature;
  STscObj         *pTscObj;
H
Haojun Liao 已提交
306
  void            *pRpcCtx;
307 308 309
  void            (*fp)();
  void            (*fetchFp)();
  void            *param;
H
hjxilinx 已提交
310 311 312 313 314 315 316
  int64_t          stime;
  uint32_t         queryId;
  void *           pStream;
  void *           pSubscription;
  char *           sqlstr;
  char             retry;
  char             maxRetry;
317
  SRpcEpSet        epSet;
H
Haojun Liao 已提交
318
  char             listed;
H
hjxilinx 已提交
319 320 321
  tsem_t           rspSem;
  SSqlCmd          cmd;
  SSqlRes          res;
H
Haojun Liao 已提交
322
  uint16_t         numOfSubs;
323 324
  struct SSqlObj **pSubs;
  struct SSqlObj * prev, *next;
H
hzcheng 已提交
325 326
} SSqlObj;

H
hjxilinx 已提交
327
typedef struct SSqlStream {
H
hzcheng 已提交
328 329 330
  SSqlObj *pSql;
  uint32_t streamId;
  char     listed;
331 332
  bool     isProject;
  int16_t  precision;
H
hzcheng 已提交
333 334 335
  int64_t  num;  // number of computing count

  /*
S
slguan 已提交
336
   * keep the number of current result in computing,
H
hzcheng 已提交
337 338 339 340 341 342 343
   * the value will be set to 0 before set timer for next computing
   */
  int64_t numOfRes;

  int64_t useconds;  // total  elapsed time
  int64_t ctime;     // stream created time
  int64_t stime;     // stream next executed time
S
slguan 已提交
344
  int64_t etime;     // stream end query time, when time is larger then etime, the stream will be closed
H
hzcheng 已提交
345 346 347 348 349 350 351
  int64_t interval;
  int64_t slidingTime;
  void *  pTimer;

  void (*fp)();
  void *param;

S
slguan 已提交
352
  void (*callback)(void *);  // Callback function when stream is stopped from client level
H
hjxilinx 已提交
353
  struct SSqlStream *prev, *next;
H
hzcheng 已提交
354 355
} SSqlStream;

356
int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn);
H
hjxilinx 已提交
357
void    tscInitMsgsFp();
S
slguan 已提交
358

H
Haojun Liao 已提交
359
int tsParseSql(SSqlObj *pSql, bool initial);
H
hzcheng 已提交
360

361
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet);
S
slguan 已提交
362
int  tscProcessSql(SSqlObj *pSql);
H
hzcheng 已提交
363

H
Haojun Liao 已提交
364
int  tscRenewTableMeta(SSqlObj *pSql, char *tableId);
H
hzcheng 已提交
365
void tscQueueAsyncRes(SSqlObj *pSql);
S
slguan 已提交
366

H
[td-99]  
hjxilinx 已提交
367
void tscQueueAsyncError(void(*fp), void *param, int32_t code);
H
hzcheng 已提交
368 369 370 371 372

int tscProcessLocalCmd(SSqlObj *pSql);
int tscCfgDynamicOptions(char *msg);
int taos_retrieve(TAOS_RES *res);

H
hjxilinx 已提交
373 374
int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
void    tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
H
hzcheng 已提交
375

376
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
H
hjxilinx 已提交
377
void    tscDestroyResPointerInfo(SSqlRes *pRes);
H
hzcheng 已提交
378

379
void tscResetSqlCmdObj(SSqlCmd *pCmd);
H
hzcheng 已提交
380

weixin_48148422's avatar
weixin_48148422 已提交
381 382 383 384
/**
 * free query result of the sql object
 * @param pObj
 */
H
hjxilinx 已提交
385
void tscFreeSqlResult(SSqlObj *pSql);
weixin_48148422's avatar
weixin_48148422 已提交
386

H
hzcheng 已提交
387 388 389 390 391
/**
 * only free part of resources allocated during query.
 * Note: this function is multi-thread safe.
 * @param pObj
 */
H
hjxilinx 已提交
392
void tscPartiallyFreeSqlObj(SSqlObj *pObj);
H
hzcheng 已提交
393 394 395 396 397 398 399 400 401 402

/**
 * free sql object, release allocated resource
 * @param pObj  Free metric/meta information, dynamically allocated payload, and
 * response buffer, object itself
 */
void tscFreeSqlObj(SSqlObj *pObj);

void tscCloseTscObj(STscObj *pObj);

H
hjxilinx 已提交
403 404
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
                     void *param, void **taos);
H
Hui Li 已提交
405
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ;
H
hjxilinx 已提交
406 407

void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
408

H
Haojun Liao 已提交
409
void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql);
H
hjxilinx 已提交
410
void tscKillSTableQuery(SSqlObj *pSql);
411
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
H
Haojun Liao 已提交
412
bool tscIsUpdateQuery(SSqlObj* pSql);
413
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
H
Haojun Liao 已提交
414 415

// todo remove this function.
416
bool tscResultsetFetchCompleted(TAOS_RES *result);
H
hjxilinx 已提交
417

418
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
419

H
hjxilinx 已提交
420
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
H
hzcheng 已提交
421

H
hjxilinx 已提交
422
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
H
Haojun Liao 已提交
423 424 425
//void    tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column);

static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
H
Haojun Liao 已提交
426
  SFieldSupInfo* pInfo = (SFieldSupInfo*) TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex);
H
Haojun Liao 已提交
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
  assert(pInfo->pSqlExpr != NULL);

  int32_t type = pInfo->pSqlExpr->resType;
  int32_t bytes = pInfo->pSqlExpr->resBytes;

  char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row;
  if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
    int32_t realLen = varDataLen(pData);
    assert(realLen <= bytes - VARSTR_HEADER_SIZE);

    if (isNull(pData, type)) {
      pRes->tsrow[columnIndex] = NULL;
    } else {
      pRes->tsrow[columnIndex] = ((tstr*)pData)->data;
    }

    if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
      *(pData + realLen + VARSTR_HEADER_SIZE) = 0;
    }

    pRes->length[columnIndex] = realLen;
  } else {
    assert(bytes == tDataTypeDesc[type].nSize);

    if (isNull(pData, type)) {
      pRes->tsrow[columnIndex] = NULL;
    } else {
      pRes->tsrow[columnIndex] = pData;
    }

    pRes->length[columnIndex] = bytes;
  }
}
H
hjxilinx 已提交
460 461 462 463 464 465 466

extern void *    tscCacheHandle;
extern void *    tscTmr;
extern void *    tscQhandle;
extern int       tscKeepConn[];
extern int       tsInsertHeadSize;
extern int       tscNumOfThreads;
467 468
  
extern SRpcCorEpSet tscMgmtEpSet;
H
hjxilinx 已提交
469 470

extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
H
hzcheng 已提交
471

472 473
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);

weixin_48148422's avatar
weixin_48148422 已提交
474
int32_t tscCompareTidTags(const void* p1, const void* p2);
H
Haojun Liao 已提交
475
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
weixin_48148422's avatar
weixin_48148422 已提交
476

H
hzcheng 已提交
477 478 479 480 481
#ifdef __cplusplus
}
#endif

#endif