tstreamUpdate.c 5.8 KB
Newer Older
R
root 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tstreamUpdate.h"
#include "ttime.h"

#define DEFAULT_FALSE_POSITIVE 0.01
L
Liu Jicong 已提交
20 21
#define DEFAULT_BUCKET_SIZE    1024
#define ROWS_PER_MILLISECOND   1
5
54liuyao 已提交
22
#define MAX_NUM_SCALABLE_BF    100000
L
Liu Jicong 已提交
23 24 25 26
#define MIN_NUM_SCALABLE_BF    10
#define DEFAULT_PREADD_BUCKET  1
#define MAX_INTERVAL           MILLISECOND_PER_MINUTE
#define MIN_INTERVAL           (MILLISECOND_PER_SECOND * 10)
5
54liuyao 已提交
27 28 29 30 31
#define DEFAULT_EXPECTED_ENTRIES    10000

static int64_t adjustExpEntries(int64_t entries) {
  return TMIN(DEFAULT_EXPECTED_ENTRIES, entries);
}
R
root 已提交
32 33

static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
L
Liu Jicong 已提交
34
  if (pInfo->numSBFs < count) {
R
root 已提交
35 36 37
    count = pInfo->numSBFs;
  }
  for (uint64_t i = 0; i < count; ++i) {
5
54liuyao 已提交
38 39
    int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
    SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
R
root 已提交
40 41 42 43 44 45 46
    taosArrayPush(pInfo->pTsSBFs, &tsSBF);
  }
}

static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) {
  if (count < pInfo->numSBFs - 1) {
    for (uint64_t i = 0; i < count; ++i) {
5
54liuyao 已提交
47
      SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0);
R
root 已提交
48
      tScalableBfDestroy(pTsSBFs);
5
54liuyao 已提交
49
      taosArrayRemove(pInfo->pTsSBFs, 0);
R
root 已提交
50 51 52 53 54 55 56 57 58 59 60 61
    }
  } else {
    taosArrayClearP(pInfo->pTsSBFs, (FDelete)tScalableBfDestroy);
  }
  pInfo->minTS += pInfo->interval * count;
}

static int64_t adjustInterval(int64_t interval, int32_t precision) {
  int64_t val = interval;
  if (precision != TSDB_TIME_PRECISION_MILLI) {
    val = convertTimePrecision(interval, precision, TSDB_TIME_PRECISION_MILLI);
  }
5
54liuyao 已提交
62 63

  if (val <= 0 || val > MAX_INTERVAL) {
R
root 已提交
64
    val = MAX_INTERVAL;
5
54liuyao 已提交
65 66 67 68 69 70
  } else if (val < MIN_INTERVAL) {
    val = MIN_INTERVAL;
  }

  if (precision != TSDB_TIME_PRECISION_MILLI) {
    val = convertTimePrecision(val, TSDB_TIME_PRECISION_MILLI, precision);
R
root 已提交
71 72 73 74
  }
  return val;
}

5
54liuyao 已提交
75 76 77 78 79 80 81 82
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
  if (watermark <= 0) {
    watermark = TMIN(originInt/adjInterval, 1) * adjInterval;
  } else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
    watermark = MAX_NUM_SCALABLE_BF * adjInterval;
  }/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) {
    watermark = MIN_NUM_SCALABLE_BF * adjInterval;
  }*/ // Todo(liuyao) save window info to tdb
5
54liuyao 已提交
83 84 85
  return watermark;
}

L
Liu Jicong 已提交
86
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark) {
R
root 已提交
87 88 89 90 91 92 93 94 95 96 97 98
  return updateInfoInit(pInterval->interval, pInterval->precision, watermark);
}

SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark) {
  SUpdateInfo *pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
  if (pInfo == NULL) {
    return NULL;
  }
  pInfo->pTsBuckets = NULL;
  pInfo->pTsSBFs = NULL;
  pInfo->minTS = -1;
  pInfo->interval = adjustInterval(interval, precision);
5
54liuyao 已提交
99
  pInfo->watermark = adjustWatermark(pInfo->interval, interval, watermark);
R
root 已提交
100

5
54liuyao 已提交
101
  uint64_t bfSize = (uint64_t)(pInfo->watermark / pInfo->interval);
R
root 已提交
102

L
Liu Jicong 已提交
103
  pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void *));
R
root 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117
  if (pInfo->pTsSBFs == NULL) {
    updateInfoDestroy(pInfo);
    return NULL;
  }
  pInfo->numSBFs = bfSize;
  windowSBfAdd(pInfo, bfSize);

  pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY));
  if (pInfo->pTsBuckets == NULL) {
    updateInfoDestroy(pInfo);
    return NULL;
  }

  TSKEY dumy = 0;
L
Liu Jicong 已提交
118
  for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) {
R
root 已提交
119 120 121 122 123 124
    taosArrayPush(pInfo->pTsBuckets, &dumy);
  }
  pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
  return pInfo;
}

L
Liu Jicong 已提交
125
static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
R
root 已提交
126 127 128 129 130 131
  if (ts <= 0) {
    return NULL;
  }
  if (pInfo->minTS < 0) {
    pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval);
  }
5
54liuyao 已提交
132 133 134 135
  int64_t index = (int64_t)((ts - pInfo->minTS) / pInfo->interval);
  if (index < 0) {
    return NULL;
  }
R
root 已提交
136 137 138 139 140 141 142 143
  if (index >= pInfo->numSBFs) {
    uint64_t count = index + 1 - pInfo->numSBFs;
    windowSBfDelete(pInfo, count);
    windowSBfAdd(pInfo, count);
    index = pInfo->numSBFs - 1;
  }
  SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index);
  if (res == NULL) {
5
54liuyao 已提交
144 145
    int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
    res = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
R
root 已提交
146 147 148 149 150
    taosArrayPush(pInfo->pTsSBFs, &res);
  }
  return res;
}

5
54liuyao 已提交
151
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
L
Liu Jicong 已提交
152 153
  int32_t      res = TSDB_CODE_FAILED;
  uint64_t     index = ((uint64_t)tableId) % pInfo->numBuckets;
5
54liuyao 已提交
154 155 156 157 158 159
  TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
  if (ts < maxTs - pInfo->watermark) {
    // this window has been closed.
    return true;
  }

L
Liu Jicong 已提交
160
  SScalableBf *pSBf = getSBf(pInfo, ts);
R
root 已提交
161 162 163 164 165
  // pSBf may be a null pointer
  if (pSBf) {
    res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
  }

L
Liu Jicong 已提交
166
  if (maxTs < ts) {
R
root 已提交
167 168 169 170 171 172 173 174 175 176
    taosArraySet(pInfo->pTsBuckets, index, &ts);
    return false;
  }

  if (ts < pInfo->minTS) {
    return true;
  } else if (res == TSDB_CODE_SUCCESS) {
    return false;
  }

L
Liu Jicong 已提交
177
  // check from tsdb api
R
root 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190 191
  return true;
}

void updateInfoDestroy(SUpdateInfo *pInfo) {
  if (pInfo == NULL) {
    return;
  }
  taosArrayDestroy(pInfo->pTsBuckets);

  uint64_t size = taosArrayGetSize(pInfo->pTsSBFs);
  for (uint64_t i = 0; i < size; i++) {
    SScalableBf *pSBF = taosArrayGetP(pInfo->pTsSBFs, i);
    tScalableBfDestroy(pSBF);
  }
L
Liu Jicong 已提交
192

R
root 已提交
193 194
  taosArrayDestroy(pInfo->pTsSBFs);
  taosMemoryFree(pInfo);
L
Liu Jicong 已提交
195
}