textbuffer.h 7.7 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef TDENGINE_TEXTBUFFER_H
#define TDENGINE_TEXTBUFFER_H

#ifdef __cplusplus
extern "C" {
#endif

// TODO REFACTOR

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "tutil.h"

#define DEFAULT_PAGE_SIZE 16384  // 16k larger than the SHistoInfo
#define MIN_BUFFER_SIZE (1 << 19)
#define MAX_TMPFILE_PATH_LENGTH 512
#define INITIAL_ALLOCATION_BUFFER_SIZE 64

// forward declare
struct tTagSchema;

typedef enum EXT_BUFFER_FLUSH_MODEL {
  /*
   * all data that have been flushed to disk is belonged to the same group
   * which means, all data in disk are sorted, or order is not matter in this case
   */
  SINGLE_APPEND_MODEL,

  /*
   * each flush operation to disk is completely independant to any other flush operation
   * we simply merge several set of data in one file, to reduce the count of flat files
   * in disk. So in this case, we need to keep the flush-out information in tFlushoutInfo
   * structure.
   */
  MULTIPLE_APPEND_MODEL,
} EXT_BUFFER_FLUSH_MODEL;

typedef struct tFlushoutInfo {
  uint32_t startPageId;
  uint32_t numOfPages;
} tFlushoutInfo;

typedef struct tFlushoutData {
  uint32_t       nAllocSize;
  uint32_t       nLength;
  tFlushoutInfo *pFlushoutInfo;
} tFlushoutData;

typedef struct tFileMeta {
  uint32_t      nFileSize;  // in pages
  uint32_t      nPageSize;
  uint32_t      numOfElemsInFile;
  tFlushoutData flushoutData;
} tFileMeta;

typedef struct tFilePage {
  uint64_t numOfElems;
  char     data[];
} tFilePage;

typedef struct tFilePagesItem {
  struct tFilePagesItem *pNext;
  tFilePage              item;
} tFilePagesItem;

typedef struct tColModel {
  int32_t         maxCapacity;
  int32_t         numOfCols;
  int16_t *       colOffset;
  struct SSchema *pFields;
} tColModel;

typedef struct tOrderIdx {
  int32_t numOfOrderedCols;
  int16_t pData[];
} tOrderIdx;

typedef struct tOrderDescriptor {
  union {
    struct tTagSchema *pTagSchema;
    tColModel *        pSchema;
  };
  int32_t   tsOrder;  // timestamp order type if exists
  tOrderIdx orderIdx;
} tOrderDescriptor;

typedef struct tExtMemBuffer {
  int32_t nMaxSizeInPages;

  int32_t nElemSize;
  int32_t nPageSize;

  int32_t numOfAllElems;
  int32_t numOfElemsInBuffer;
  int32_t numOfElemsPerPage;

  int16_t         numOfPagesInMem;
  tFilePagesItem *pHead;
  tFilePagesItem *pTail;

  tFileMeta fileMeta;

  char  dataFilePath[MAX_TMPFILE_PATH_LENGTH];
  FILE *dataFile;

  tColModel *pColModel;

  EXT_BUFFER_FLUSH_MODEL flushModel;
} tExtMemBuffer;

void getExtTmpfilePath(const char *fileNamePattern, int64_t serialNumber, int32_t seg, int32_t slot, char *dstPath);

/*
 * create ext-memory buffer
 */
void tExtMemBufferCreate(tExtMemBuffer **pMemBuffer, int32_t numOfBufferSize, int32_t elemSize,
                         const char *tmpDataFilePath, tColModel *pModel);

/*
 * destroy ext-memory buffer
 */
void tExtMemBufferDestroy(tExtMemBuffer **pMemBuffer);

/*
 * @param pMemBuffer
 * @param data       input data pointer
 * @param numOfRows  number of rows in data
 * @param pModel     column format model
 * @return           number of pages in memory
 */
int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRows);

/*
 * flush all data into disk and release all in-memory buffer
 */
bool tExtMemBufferFlush(tExtMemBuffer *pMemBuffer);

/*
 * remove all data that has been put into buffer, including in buffer or
 * ext-buffer(disk)
 */
void tExtMemBufferClear(tExtMemBuffer *pMemBuffer);

/*
 * this function should be removed.
 * since the flush to disk operation is transparent to client this structure should provide stream operation for data,
 * and there is an internal cursor point to the data.
 */
bool tExtMemBufferLoadData(tExtMemBuffer *pMemBuffer, tFilePage *pFilePage, int32_t flushIdx, int32_t pageIdx);

bool tExtMemBufferIsAllDataInMem(tExtMemBuffer *pMemBuffer);

tColModel *tColModelCreate(SSchema *field, int32_t numOfCols, int32_t maxCapacity);

void tColModelDestroy(tColModel *pModel);

typedef struct SSrcColumnInfo {
  int32_t functionId;
  int32_t type;
} SSrcColumnInfo;

/*
 * display data in column format model for debug purpose only
 */
void tColModelDisplay(tColModel *pModel, void *pData, int32_t numOfRows, int32_t maxCount);

void tColModelDisplayEx(tColModel *pModel, void *pData, int32_t numOfRows, int32_t maxCount, SSrcColumnInfo *pInfo);

/*
 * compress data into consecutive block without hole in data
 */
void tColModelCompress(tColModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity);

void tColModelErase(tColModel *pModel, tFilePage *inputBuffer, int32_t maxCapacity, int32_t s, int32_t e);

tOrderDescriptor *tOrderDesCreate(int32_t *orderColIdx, int32_t numOfOrderCols, tColModel *pModel, int32_t tsOrderType);

void tOrderDescDestroy(tOrderDescriptor *pDesc);

void tColModelAppend(tColModel *dstModel, tFilePage *dstPage, void *srcData, int32_t srcStartRows,
                     int32_t numOfRowsToWrite, int32_t srcCapacity);

///////////////////////////////////////////////////////////////////////////////////////////////////////
typedef struct MinMaxEntry {
  union {
    double  dMinVal;
    int32_t iMinVal;
    int64_t i64MinVal;
  };
  union {
    double  dMaxVal;
    int32_t iMaxVal;
    int64_t i64MaxVal;
  };
} MinMaxEntry;

typedef struct tMemBucketSegment {
  int32_t         numOfSlots;
  MinMaxEntry *   pBoundingEntries;
  tExtMemBuffer **pBuffer;
} tMemBucketSegment;

typedef struct tMemBucket {
  int16_t numOfSegs;
  int16_t nTotalSlots;
  int16_t nSlotsOfSeg;
  int16_t dataType;

  int16_t nElemSize;
  int32_t numOfElems;

  int32_t nTotalBufferSize;
  int32_t maxElemsCapacity;

  int16_t nPageSize;
  int16_t numOfTotalPages;
  int16_t numOfAvailPages; /* remain available buffer pages */

  tMemBucketSegment *pSegs;
  tOrderDescriptor * pOrderDesc;

  MinMaxEntry nRange;

  void (*HashFunc)(struct tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);
} tMemBucket;

typedef int (*__col_compar_fn_t)(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data);

void tColDataQSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType);

int32_t compare_sa(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data);

int32_t compare_sd(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data);

int32_t compare_a(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1, int32_t numOfRow2, int32_t s2,
                  char *data2);

int32_t compare_d(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1, int32_t numOfRow2, int32_t s2,
                  char *data2);

void tMemBucketCreate(tMemBucket **pBucket, int32_t totalSlots, int32_t nBufferSize, int16_t nElemSize,
                      int16_t dataType, tOrderDescriptor *pDesc);

void tMemBucketDestroy(tMemBucket **pBucket);

void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows);

double getPercentile(tMemBucket *pMemBucket, double percent);

void tBucketIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);

void tBucketDoubleHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);

#ifdef __cplusplus
}
#endif

#endif  // TBASE_SORT_H