tsdb.h 13.0 KB
Newer Older
H
more  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef _TD_TSDB_H_
#define _TD_TSDB_H_

#include <pthread.h>
#include <stdbool.h>
#include <stdint.h>

H
Hongze Cheng 已提交
22
#include "common.h"
H
more  
Hongze Cheng 已提交
23 24 25
#include "taosdef.h"
#include "tarray.h"
#include "tdataformat.h"
H
more  
Hongze Cheng 已提交
26
#include "thash.h"
H
more  
Hongze Cheng 已提交
27
#include "tlist.h"
H
Hongze Cheng 已提交
28 29 30
#include "tlockfree.h"
#include "tmsg.h"
#include "tname.h"
H
more  
Hongze Cheng 已提交
31 32 33 34 35 36 37 38 39 40 41 42

#ifdef __cplusplus
extern "C" {
#endif

#define TSDB_VERSION_MAJOR 1
#define TSDB_VERSION_MINOR 0

#define TSDB_INVALID_SUPER_TABLE_ID -1

#define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2
H
Hongze Cheng 已提交
43
#define TSDB_STATUS_COMMIT_NOBLOCK 3  // commit no block, need to be solved
H
more  
Hongze Cheng 已提交
44 45 46 47 48 49

// TSDB STATE DEFINITION
#define TSDB_STATE_OK 0x0
#define TSDB_STATE_BAD_META 0x1
#define TSDB_STATE_BAD_DATA 0x2

H
more  
Hongze Cheng 已提交
50 51 52 53 54 55 56 57 58 59
typedef struct SDataStatis {
  int16_t colId;
  int64_t sum;
  int64_t max;
  int64_t min;
  int16_t maxIndex;
  int16_t minIndex;
  int16_t numOfNull;
} SDataStatis;

H
more  
Hongze Cheng 已提交
60 61 62 63 64 65
// --------- TSDB APPLICATION HANDLE DEFINITION
typedef struct {
  void *appH;
  void *cqH;
  int (*notifyStatus)(void *, int status, int eno);
  int (*eventCallBack)(void *);
H
Hongze Cheng 已提交
66 67
  void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema,
                        int start);
H
more  
Hongze Cheng 已提交
68 69 70 71 72 73 74 75 76 77 78 79
  void (*cqDropFunc)(void *handle);
} STsdbAppH;

// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
typedef struct {
  int32_t tsdbId;
  int32_t cacheBlockSize;
  int32_t totalBlocks;
  int32_t daysPerFile;  // day per file sharding policy
  int32_t keep;         // day of data to keep
  int32_t keep1;
  int32_t keep2;
H
Hongze Cheng 已提交
80
  int32_t lruCacheSize;
H
more  
Hongze Cheng 已提交
81 82 83 84 85
  int32_t minRowsPerFileBlock;  // minimum rows per file block
  int32_t maxRowsPerFileBlock;  // maximum rows per file block
  int8_t  precision;
  int8_t  compression;
  int8_t  update;
H
Hongze Cheng 已提交
86
  int8_t  cacheLastRow;  // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2
H
more  
Hongze Cheng 已提交
87 88
} STsdbCfg;

H
Hongze Cheng 已提交
89 90
#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0)
#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0)
H
more  
Hongze Cheng 已提交
91 92 93 94 95 96 97 98 99
#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0)

// --------- TSDB REPOSITORY USAGE STATISTICS
typedef struct {
  int64_t totalStorage;  // total bytes occupie
  int64_t compStorage;
  int64_t pointsWritten;  // total data points written
} STsdbStat;

H
Hongze Cheng 已提交
100
typedef struct STsdb STsdb;
H
more  
Hongze Cheng 已提交
101

H
Hongze Cheng 已提交
102
STsdbCfg *tsdbGetCfg(const STsdb *repo);
H
more  
Hongze Cheng 已提交
103 104

// --------- TSDB REPOSITORY DEFINITION
H
Hongze Cheng 已提交
105 106
// int32_t tsdbCreateRepo(int repoid);
// int32_t tsdbDropRepo(int repoid);
H
more  
Hongze Cheng 已提交
107 108
STsdb * tsdbOpen(STsdbCfg *pCfg, STsdbAppH *pAppH);
int     tsdbClose(STsdb *repo, int toCommit);
H
Hongze Cheng 已提交
109 110 111
int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg);
int     tsdbGetState(STsdb *repo);
int8_t  tsdbGetCompactState(STsdb *repo);
H
more  
Hongze Cheng 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
// --------- TSDB TABLE DEFINITION
typedef struct {
  uint64_t uid;  // the unique table ID
  int32_t  tid;  // the table ID in the repository.
} STableId;

// --------- TSDB TABLE configuration
typedef struct {
  ETableType type;
  char *     name;
  STableId   tableId;
  int32_t    sversion;
  char *     sname;  // super table name
  uint64_t   superUid;
  STSchema * schema;
  STSchema * tagSchema;
  SKVRow     tagValues;
  char *     sql;
} STableCfg;

void tsdbClearTableCfg(STableCfg *config);

void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type);
char *tsdbGetTableName(void *pTable);

H
Hongze Cheng 已提交
137 138 139
#define TSDB_TABLEID(_table) ((STableId *)(_table))
#define TSDB_PREV_ROW 0x1
#define TSDB_NEXT_ROW 0x2
H
more  
Hongze Cheng 已提交
140 141 142

STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);

H
Hongze Cheng 已提交
143 144 145
int tsdbCreateTable(STsdb *repo, STableCfg *pCfg);
int tsdbDropTable(STsdb *pRepo, STableId tableId);
int tsdbUpdateTableTagValue(STsdb *repo, SUpdateTableTagValMsg *pMsg);
H
more  
Hongze Cheng 已提交
146

H
Hongze Cheng 已提交
147
uint32_t tsdbGetFileInfo(STsdb *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size);
H
more  
Hongze Cheng 已提交
148 149 150 151 152 153 154 155 156

// the TSDB repository info
typedef struct STsdbRepoInfo {
  STsdbCfg tsdbCfg;
  uint64_t version;            // version of the repository
  int64_t  tsdbTotalDataSize;  // the original inserted data size
  int64_t  tsdbTotalDiskSize;  // the total disk size taken by this TSDB repository
  // TODO: Other informations to add
} STsdbRepoInfo;
H
Hongze Cheng 已提交
157
STsdbRepoInfo *tsdbGetStatus(STsdb *pRepo);
H
more  
Hongze Cheng 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174

// the meter information report structure
typedef struct {
  STableCfg tableCfg;
  uint64_t  version;
  int64_t   tableTotalDataSize;  // In bytes
  int64_t   tableTotalDiskSize;  // In bytes
} STableInfo;

// -- FOR INSERT DATA
/**
 * Insert data to a table in a repository
 * @param pRepo the TSDB repository handle
 * @param pData the data to insert (will give a more specific description)
 *
 * @return the number of points inserted, -1 for failure and the error number is set
 */
H
Hongze Cheng 已提交
175
int32_t tsdbInsertData(STsdb *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
H
more  
Hongze Cheng 已提交
176 177 178 179 180

// -- FOR QUERY TIME SERIES DATA

typedef void *TsdbQueryHandleT;  // Use void to hide implementation details

H
Hongze Cheng 已提交
181 182 183
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
H
more  
Hongze Cheng 已提交
184 185 186 187

// query condition to build multi-table data block iterator
typedef struct STsdbQueryCond {
  STimeWindow  twindow;
H
Hongze Cheng 已提交
188 189
  int32_t      order;   // desc|asc order to iterate the data block
  int64_t      offset;  // skip offset put down to tsdb
H
more  
Hongze Cheng 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
  int32_t      numOfCols;
  SColumnInfo *colList;
  bool         loadExternalRows;  // load external rows or not
  int32_t      type;              // data block load type:
} STsdbQueryCond;

typedef struct STableData STableData;
typedef struct {
  T_REF_DECLARE()
  SRWLatch     latch;
  TSKEY        keyFirst;
  TSKEY        keyLast;
  int64_t      numOfRows;
  int32_t      maxTables;
  STableData **tData;
  SList *      actList;
  SList *      extraBuffList;
  SList *      bufBlockList;
  int64_t      pointsAdd;   // TODO
  int64_t      storageAdd;  // TODO
} SMemTable;

typedef struct {
H
Hongze Cheng 已提交
213 214
  SMemTable *mem;
  SMemTable *imem;
H
more  
Hongze Cheng 已提交
215
  SMemTable  mtable;
H
Hongze Cheng 已提交
216
  SMemTable *omem;
H
more  
Hongze Cheng 已提交
217 218 219 220 221 222 223
} SMemSnapshot;

typedef struct SMemRef {
  int32_t      ref;
  SMemSnapshot snapshot;
} SMemRef;

H
Hongze Cheng 已提交
224
#if 0
H
more  
Hongze Cheng 已提交
225 226 227 228 229 230 231 232 233 234 235
typedef struct SFileBlockInfo {
  int32_t numBlocksOfStep;
} SFileBlockInfo;

typedef struct {
  void *pTable;
  TSKEY lastKey;
} STableKeyInfo;

typedef struct {
  uint32_t  numOfTables;
H
Hongze Cheng 已提交
236
  SArray *  pGroupList;
H
more  
Hongze Cheng 已提交
237 238 239 240 241
  SHashObj *map;  // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;

#define TSDB_BLOCK_DIST_STEP_ROWS 16
typedef struct {
H
Hongze Cheng 已提交
242 243 244 245 246 247 248 249 250 251 252
  uint16_t rowSize;
  uint16_t numOfFiles;
  uint32_t numOfTables;
  uint64_t totalSize;
  uint64_t totalRows;
  int32_t  maxRows;
  int32_t  minRows;
  int32_t  firstSeekTimeUs;
  uint32_t numOfRowsInMemTable;
  uint32_t numOfSmallBlocks;
  SArray * dataBlockInfos;
H
more  
Hongze Cheng 已提交
253 254 255 256 257 258 259 260 261 262 263 264
} STableBlockDist;

/**
 * Get the data block iterator, starting from position according to the query condition
 *
 * @param tsdb       tsdb handle
 * @param pCond      query condition, including time window, result set order, and basic required columns for each block
 * @param tableInfoGroup  table object list in the form of set, grouped into different sets according to the
 *                        group by condition
 * @param qinfo      query info handle from query processor
 * @return
 */
H
Hongze Cheng 已提交
265
TsdbQueryHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
H
more  
Hongze Cheng 已提交
266 267 268 269 270 271 272 273 274 275 276 277
                                  SMemRef *pRef);

/**
 * Get the last row of the given query time window for all the tables in STableGroupInfo object.
 * Note that only one data block with only row will be returned while invoking retrieve data block function for
 * all tables in this group.
 *
 * @param tsdb   tsdb handle
 * @param pCond  query condition, including time window, result set order, and basic required columns for each block
 * @param tableInfo  table list.
 * @return
 */
H
Hongze Cheng 已提交
278
TsdbQueryHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
H
more  
Hongze Cheng 已提交
279 280
                                  SMemRef *pRef);

H
Hongze Cheng 已提交
281 282
TsdbQueryHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId,
                                    SMemRef *pMemRef);
H
more  
Hongze Cheng 已提交
283

H
Hongze Cheng 已提交
284
bool isTsdbCacheLastRow(TsdbQueryHandleT *pQueryHandle);
H
more  
Hongze Cheng 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300

/**
 * get the queried table object list
 * @param pHandle
 * @return
 */
SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);

/**
 * get the group list according to table id from client
 * @param tsdb
 * @param pCond
 * @param groupList
 * @param qinfo
 * @return
 */
H
Hongze Cheng 已提交
301
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
H
more  
Hongze Cheng 已提交
302 303 304
                                               uint64_t qId, SMemRef *pRef);

/**
H
Hongze Cheng 已提交
305
 * get num of rows in mem table
H
more  
Hongze Cheng 已提交
306 307 308 309 310
 *
 * @param pHandle
 * @return row size
 */

H
Hongze Cheng 已提交
311
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT *pHandle);
H
more  
Hongze Cheng 已提交
312 313

/**
H
Hongze Cheng 已提交
314
 * move to next block if exists
H
more  
Hongze Cheng 已提交
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
 *
 * @param pQueryHandle
 * @return
 */
bool tsdbNextDataBlock(TsdbQueryHandleT pQueryHandle);

/**
 * Get current data block information
 *
 * @param pQueryHandle
 * @param pBlockInfo
 * @return
 */
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *pBlockInfo);

/**
 *
 * Get the pre-calculated information w.r.t. current data block.
 *
 * In case of data block in cache, the pBlockStatis will always be NULL.
 * If a block is not completed loaded from disk, the pBlockStatis will be NULL.

 * @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
 * @return
 */
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataStatis **pBlockStatis);

/**
 *
 * The query condition with primary timestamp is passed to iterator during its constructor function,
 * the returned data block must be satisfied with the time window condition in any cases,
 * which means the SData data block is not actually the completed disk data blocks.
 *
 * @param pQueryHandle      query handle
 * @param pColumnIdList     required data columns id list
 * @return
 */
SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdList);

/**
 * Get the qualified table id for a super table according to the tag query expression.
 * @param stableid. super table sid
 * @param pTagCond. tag query condition
 */
H
Hongze Cheng 已提交
359
int32_t tsdbQuerySTableByTagCond(STsdb *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len,
H
more  
Hongze Cheng 已提交
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
                                 STableGroupInfo *pGroupList, SColIndex *pColIndex, int32_t numOfCols);

/**
 * destroy the created table group list, which is generated by tag query
 * @param pGroupList
 */
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);

/**
 * create the table group result including only one table, used to handle the normal table query
 *
 * @param tsdb        tsdbHandle
 * @param uid         table uid
 * @param pGroupInfo  the generated result
 * @return
 */
H
Hongze Cheng 已提交
376
int32_t tsdbGetOneTableGroup(STsdb *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
H
more  
Hongze Cheng 已提交
377 378 379 380 381 382 383 384

/**
 *
 * @param tsdb
 * @param pTableIdList
 * @param pGroupInfo
 * @return
 */
H
Hongze Cheng 已提交
385
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
H
more  
Hongze Cheng 已提交
386 387 388 389 390 391 392 393 394

/**
 * clean up the query handle
 * @param queryHandle
 */
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);

void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond);

H
Hongze Cheng 已提交
395
void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo *groupList);
H
more  
Hongze Cheng 已提交
396

H
Hongze Cheng 已提交
397
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT *queryHandle, STableBlockDist *pTableBlockInfo);
H
more  
Hongze Cheng 已提交
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412

// obtain queryHandle attribute
int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle);

/**
 * get the statistics of repo usage
 * @param repo. point to the tsdbrepo
 * @param totalPoints. total data point written
 * @param totalStorage. total bytes took by the tsdb
 * @param compStorage. total bytes took by the tsdb after compressed
 */
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage);

int  tsdbInitCommitQueue();
void tsdbDestroyCommitQueue();
H
Hongze Cheng 已提交
413
int  tsdbSyncCommit(STsdb *repo);
H
more  
Hongze Cheng 已提交
414 415 416 417 418 419 420 421
void tsdbIncCommitRef(int vgId);
void tsdbDecCommitRef(int vgId);
void tsdbSwitchTable(TsdbQueryHandleT pQueryHandle);

// For TSDB file sync
int tsdbSyncSend(void *pRepo, SOCKET socketFd);
int tsdbSyncRecv(void *pRepo, SOCKET socketFd);

H
Hongze Cheng 已提交
422 423
// // For TSDB Compact
// int tsdbCompact(STsdb *pRepo);
H
more  
Hongze Cheng 已提交
424 425 426

// For TSDB Health Monitor

H
Hongze Cheng 已提交
427 428 429 430
// // no problem return true
// bool tsdbNoProblem(STsdb *pRepo);
// // unit of walSize: MB
// int tsdbCheckWal(STsdb *pRepo, uint32_t walSize);
H
more  
Hongze Cheng 已提交
431

H
Hongze Cheng 已提交
432 433 434 435 436
// // for json tag
// void *getJsonTagValueElment(void *data, char *key, int32_t keyLen, char *out, int16_t bytes);
// void  getJsonTagValueAll(void *data, void *dst, int16_t bytes);
// char *parseTagDatatoJson(void *p);
#endif
H
more  
Hongze Cheng 已提交
437 438 439 440 441 442

#ifdef __cplusplus
}
#endif

#endif  // _TD_TSDB_H_