streamUpdate.c 13.1 KB
Newer Older
R
root 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "query.h"
L
Liu Jicong 已提交
17
#include "tdatablock.h"
5
54liuyao 已提交
18
#include "tencode.h"
L
Liu Jicong 已提交
19
#include "tstreamUpdate.h"
R
root 已提交
20 21
#include "ttime.h"

L
Liu Jicong 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34
#define DEFAULT_FALSE_POSITIVE   0.01
#define DEFAULT_BUCKET_SIZE      1310720
#define DEFAULT_MAP_CAPACITY     1310720
#define DEFAULT_MAP_SIZE         (DEFAULT_MAP_CAPACITY * 10)
#define ROWS_PER_MILLISECOND     1
#define MAX_NUM_SCALABLE_BF      100000
#define MIN_NUM_SCALABLE_BF      10
#define DEFAULT_PREADD_BUCKET    1
#define MAX_INTERVAL             MILLISECOND_PER_MINUTE
#define MIN_INTERVAL             (MILLISECOND_PER_SECOND * 10)
#define DEFAULT_EXPECTED_ENTRIES 10000

static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); }
R
root 已提交
35 36

static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
L
Liu Jicong 已提交
37
  if (pInfo->numSBFs < count) {
R
root 已提交
38 39 40
    count = pInfo->numSBFs;
  }
  for (uint64_t i = 0; i < count; ++i) {
L
Liu Jicong 已提交
41
    int64_t      rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
5
54liuyao 已提交
42
    SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
R
root 已提交
43 44 45 46
    taosArrayPush(pInfo->pTsSBFs, &tsSBF);
  }
}

H
Haojun Liao 已提交
47 48 49 50 51
static void clearItemHelper(void* p) {
  SScalableBf** pBf = p;
  tScalableBfDestroy(*pBf);
}

R
root 已提交
52
static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) {
5
54liuyao 已提交
53
  if (count < pInfo->numSBFs) {
R
root 已提交
54
    for (uint64_t i = 0; i < count; ++i) {
5
54liuyao 已提交
55
      SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0);
R
root 已提交
56
      tScalableBfDestroy(pTsSBFs);
5
54liuyao 已提交
57
      taosArrayRemove(pInfo->pTsSBFs, 0);
R
root 已提交
58 59
    }
  } else {
H
Haojun Liao 已提交
60
    taosArrayClearEx(pInfo->pTsSBFs, clearItemHelper);
R
root 已提交
61 62 63 64 65 66 67 68 69
  }
  pInfo->minTS += pInfo->interval * count;
}

static int64_t adjustInterval(int64_t interval, int32_t precision) {
  int64_t val = interval;
  if (precision != TSDB_TIME_PRECISION_MILLI) {
    val = convertTimePrecision(interval, precision, TSDB_TIME_PRECISION_MILLI);
  }
5
54liuyao 已提交
70 71

  if (val <= 0 || val > MAX_INTERVAL) {
R
root 已提交
72
    val = MAX_INTERVAL;
5
54liuyao 已提交
73 74 75 76 77 78
  } else if (val < MIN_INTERVAL) {
    val = MIN_INTERVAL;
  }

  if (precision != TSDB_TIME_PRECISION_MILLI) {
    val = convertTimePrecision(val, TSDB_TIME_PRECISION_MILLI, precision);
R
root 已提交
79 80 81 82
  }
  return val;
}

5
54liuyao 已提交
83
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
5
54liuyao 已提交
84
  if (watermark <= adjInterval) {
L
Liu Jicong 已提交
85
    watermark = TMAX(originInt / adjInterval, 1) * adjInterval;
5
54liuyao 已提交
86 87
  } else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
    watermark = MAX_NUM_SCALABLE_BF * adjInterval;
5
54liuyao 已提交
88
  }
5
54liuyao 已提交
89 90 91
  return watermark;
}

L
Liu Jicong 已提交
92
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark) {
R
root 已提交
93 94 95 96 97 98 99 100 101 102 103 104
  return updateInfoInit(pInterval->interval, pInterval->precision, watermark);
}

SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark) {
  SUpdateInfo *pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
  if (pInfo == NULL) {
    return NULL;
  }
  pInfo->pTsBuckets = NULL;
  pInfo->pTsSBFs = NULL;
  pInfo->minTS = -1;
  pInfo->interval = adjustInterval(interval, precision);
5
54liuyao 已提交
105
  pInfo->watermark = adjustWatermark(pInfo->interval, interval, watermark);
R
root 已提交
106

5
54liuyao 已提交
107
  uint64_t bfSize = (uint64_t)(pInfo->watermark / pInfo->interval);
R
root 已提交
108

L
Liu Jicong 已提交
109
  pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void *));
R
root 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122 123
  if (pInfo->pTsSBFs == NULL) {
    updateInfoDestroy(pInfo);
    return NULL;
  }
  pInfo->numSBFs = bfSize;
  windowSBfAdd(pInfo, bfSize);

  pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY));
  if (pInfo->pTsBuckets == NULL) {
    updateInfoDestroy(pInfo);
    return NULL;
  }

  TSKEY dumy = 0;
L
Liu Jicong 已提交
124
  for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) {
R
root 已提交
125 126 127
    taosArrayPush(pInfo->pTsBuckets, &dumy);
  }
  pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
5
54liuyao 已提交
128
  pInfo->pCloseWinSBF = NULL;
5
54liuyao 已提交
129
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
130
  pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
131 132 133
  pInfo->maxVersion = 0;
  pInfo->scanGroupId = 0;
  pInfo->scanWindow = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
R
root 已提交
134 135 136
  return pInfo;
}

L
Liu Jicong 已提交
137
static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
R
root 已提交
138 139 140 141 142 143
  if (ts <= 0) {
    return NULL;
  }
  if (pInfo->minTS < 0) {
    pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval);
  }
5
54liuyao 已提交
144 145 146 147
  int64_t index = (int64_t)((ts - pInfo->minTS) / pInfo->interval);
  if (index < 0) {
    return NULL;
  }
R
root 已提交
148 149 150 151 152 153 154 155
  if (index >= pInfo->numSBFs) {
    uint64_t count = index + 1 - pInfo->numSBFs;
    windowSBfDelete(pInfo, count);
    windowSBfAdd(pInfo, count);
    index = pInfo->numSBFs - 1;
  }
  SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index);
  if (res == NULL) {
5
54liuyao 已提交
156 157
    int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
    res = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
R
root 已提交
158 159 160 161 162
    taosArrayPush(pInfo->pTsSBFs, &res);
  }
  return res;
}

L
Liu Jicong 已提交
163 164 165 166 167 168
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) {
  void *pVal = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
  if (pVal || taosHashGetSize(pInfo->pMap) >= DEFAULT_MAP_SIZE) return true;
  return false;
}

L
Liu Jicong 已提交
169 170 171
TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) {
  if (pBlock == NULL || pBlock->info.rows == 0) return INT64_MIN;
  TSKEY   maxTs = INT64_MIN;
H
Haojun Liao 已提交
172
  int64_t tbUid = pBlock->info.id.uid;
L
Liu Jicong 已提交
173 174 175 176 177 178 179 180

  SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol);

  for (int32_t i = 0; i < pBlock->info.rows; i++) {
    TSKEY ts = ((TSKEY *)pColDataInfo->pData)[i];
    maxTs = TMAX(maxTs, ts);
    SScalableBf *pSBf = getSBf(pInfo, ts);
    if (pSBf) {
L
Liu Jicong 已提交
181 182 183 184 185
      SUpdateKey updateKey = {
          .tbUid = tbUid,
          .ts = ts,
      };
      tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey));
L
Liu Jicong 已提交
186 187 188
    }
  }
  TSKEY *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
L
Liu Jicong 已提交
189
  if (pMaxTs == NULL || *pMaxTs > maxTs) {
L
Liu Jicong 已提交
190 191
    taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY));
  }
L
Liu Jicong 已提交
192
  return maxTs;
L
Liu Jicong 已提交
193 194
}

195
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
L
Liu Jicong 已提交
196 197 198 199 200 201 202
  int32_t res = TSDB_CODE_FAILED;

  SUpdateKey updateKey = {
      .tbUid = tableId,
      .ts = ts,
  };

L
Liu Jicong 已提交
203 204 205
  TSKEY   *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
  uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
  TSKEY    maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
5
54liuyao 已提交
206 207
  if (ts < maxTs - pInfo->watermark) {
    // this window has been closed.
5
54liuyao 已提交
208
    if (pInfo->pCloseWinSBF) {
L
Liu Jicong 已提交
209
      res = tScalableBfPut(pInfo->pCloseWinSBF, &updateKey, sizeof(SUpdateKey));
210 211 212 213 214
      if (res == TSDB_CODE_SUCCESS) {
        return false;
      } else {
        return true;
      }
5
54liuyao 已提交
215
    }
5
54liuyao 已提交
216 217 218
    return true;
  }

L
Liu Jicong 已提交
219
  SScalableBf *pSBf = getSBf(pInfo, ts);
R
root 已提交
220 221
  // pSBf may be a null pointer
  if (pSBf) {
L
Liu Jicong 已提交
222
    res = tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey));
R
root 已提交
223 224
  }

225
  int32_t size = taosHashGetSize(pInfo->pMap);
L
Liu Jicong 已提交
226
  if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) {
227 228 229 230
    taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
    return false;
  }

L
Liu Jicong 已提交
231
  if (!pMapMaxTs && maxTs < ts) {
R
root 已提交
232 233 234 235 236 237 238 239 240
    taosArraySet(pInfo->pTsBuckets, index, &ts);
    return false;
  }

  if (ts < pInfo->minTS) {
    return true;
  } else if (res == TSDB_CODE_SUCCESS) {
    return false;
  }
L
Liu Jicong 已提交
241
  // check from tsdb api
R
root 已提交
242 243 244
  return true;
}

L
Liu Jicong 已提交
245 246 247
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) {
  qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
         pWin->skey, pWin->ekey, version);
248 249 250 251 252
  pInfo->scanWindow = *pWin;
  pInfo->scanGroupId = groupId;
  pInfo->maxVersion = version;
}

L
Liu Jicong 已提交
253
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) {
254 255 256
  if (!pInfo) {
    return false;
  }
L
Liu Jicong 已提交
257 258 259 260 261 262
  qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
         pWin->skey, pWin->ekey, version);
  if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && pWin->ekey <= pInfo->scanWindow.ekey &&
      version <= pInfo->maxVersion) {
    qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
           pWin->skey, pWin->ekey, version);
263 264 265 266 267
    return true;
  }
  return false;
}

R
root 已提交
268 269 270 271 272 273 274 275 276 277 278
void updateInfoDestroy(SUpdateInfo *pInfo) {
  if (pInfo == NULL) {
    return;
  }
  taosArrayDestroy(pInfo->pTsBuckets);

  uint64_t size = taosArrayGetSize(pInfo->pTsSBFs);
  for (uint64_t i = 0; i < size; i++) {
    SScalableBf *pSBF = taosArrayGetP(pInfo->pTsSBFs, i);
    tScalableBfDestroy(pSBF);
  }
L
Liu Jicong 已提交
279

R
root 已提交
280
  taosArrayDestroy(pInfo->pTsSBFs);
5
54liuyao 已提交
281
  taosHashCleanup(pInfo->pMap);
R
root 已提交
282
  taosMemoryFree(pInfo);
L
Liu Jicong 已提交
283
}
5
54liuyao 已提交
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299

void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo) {
  if (pInfo->pCloseWinSBF) {
    return;
  }
  int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
  pInfo->pCloseWinSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
}

void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) {
  if (!pInfo || !pInfo->pCloseWinSBF) {
    return;
  }
  tScalableBfDestroy(pInfo->pCloseWinSBF);
  pInfo->pCloseWinSBF = NULL;
}
5
54liuyao 已提交
300 301

int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) {
302
  ASSERT(pInfo);
5
54liuyao 已提交
303 304 305 306 307 308 309
  SEncoder encoder = {0};
  tEncoderInit(&encoder, buf, bufLen);
  if (tStartEncode(&encoder) < 0) return -1;

  int32_t size = taosArrayGetSize(pInfo->pTsBuckets);
  if (tEncodeI32(&encoder, size) < 0) return -1;
  for (int32_t i = 0; i < size; i++) {
L
Liu Jicong 已提交
310
    TSKEY *pTs = (TSKEY *)taosArrayGet(pInfo->pTsBuckets, i);
5
54liuyao 已提交
311 312 313 314 315 316 317 318
    if (tEncodeI64(&encoder, *pTs) < 0) return -1;
  }

  if (tEncodeU64(&encoder, pInfo->numBuckets) < 0) return -1;

  int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs);
  if (tEncodeI32(&encoder, sBfSize) < 0) return -1;
  for (int32_t i = 0; i < sBfSize; i++) {
L
Liu Jicong 已提交
319
    SScalableBf *pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
5
54liuyao 已提交
320 321 322 323 324 325 326
    if (tScalableBfEncode(pSBf, &encoder) < 0) return -1;
  }

  if (tEncodeU64(&encoder, pInfo->numSBFs) < 0) return -1;
  if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1;
  if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1;
  if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1;
L
Liu Jicong 已提交
327

5
54liuyao 已提交
328 329 330 331
  if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1;

  int32_t mapSize = taosHashGetSize(pInfo->pMap);
  if (tEncodeI32(&encoder, mapSize) < 0) return -1;
L
Liu Jicong 已提交
332
  void  *pIte = NULL;
5
54liuyao 已提交
333 334
  size_t keyLen = 0;
  while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) {
L
Liu Jicong 已提交
335 336 337
    void *key = taosHashGetKey(pIte, &keyLen);
    if (tEncodeU64(&encoder, *(uint64_t *)key) < 0) return -1;
    if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1;
5
54liuyao 已提交
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
  }

  if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1;
  if (tEncodeI64(&encoder, pInfo->scanWindow.ekey) < 0) return -1;
  if (tEncodeU64(&encoder, pInfo->scanGroupId) < 0) return -1;
  if (tEncodeU64(&encoder, pInfo->maxVersion) < 0) return -1;

  tEndEncode(&encoder);

  int32_t tlen = encoder.pos;
  tEncoderClear(&encoder);
  return tlen;
}

int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
353
  ASSERT(pInfo);
5
54liuyao 已提交
354 355 356 357 358 359
  SDecoder decoder = {0};
  tDecoderInit(&decoder, buf, bufLen);
  if (tStartDecode(&decoder) < 0) return -1;

  int32_t size = 0;
  if (tDecodeI32(&decoder, &size) < 0) return -1;
L
Liu Jicong 已提交
360
  pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
5
54liuyao 已提交
361 362 363 364 365 366 367 368 369 370 371 372
  TSKEY ts = INT64_MIN;
  for (int32_t i = 0; i < size; i++) {
    if (tDecodeI64(&decoder, &ts) < 0) return -1;
    taosArrayPush(pInfo->pTsBuckets, &ts);
  }

  if (tDecodeU64(&decoder, &pInfo->numBuckets) < 0) return -1;

  int32_t sBfSize = 0;
  if (tDecodeI32(&decoder, &sBfSize) < 0) return -1;
  pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *));
  for (int32_t i = 0; i < sBfSize; i++) {
L
Liu Jicong 已提交
373
    SScalableBf *pSBf = tScalableBfDecode(&decoder);
5
54liuyao 已提交
374 375 376 377 378 379 380 381 382 383 384 385
    if (!pSBf) return -1;
    taosArrayPush(pInfo->pTsSBFs, &pSBf);
  }

  if (tDecodeU64(&decoder, &pInfo->numSBFs) < 0) return -1;
  if (tDecodeI64(&decoder, &pInfo->interval) < 0) return -1;
  if (tDecodeI64(&decoder, &pInfo->watermark) < 0) return -1;
  if (tDecodeI64(&decoder, &pInfo->minTS) < 0) return -1;
  pInfo->pCloseWinSBF = tScalableBfDecode(&decoder);

  int32_t mapSize = 0;
  if (tDecodeI32(&decoder, &mapSize) < 0) return -1;
L
Liu Jicong 已提交
386
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
5
54liuyao 已提交
387 388 389
  pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK);
  uint64_t uid = 0;
  ts = INT64_MIN;
L
Liu Jicong 已提交
390
  for (int32_t i = 0; i < mapSize; i++) {
5
54liuyao 已提交
391 392 393 394
    if (tDecodeU64(&decoder, &uid) < 0) return -1;
    if (tDecodeI64(&decoder, &ts) < 0) return -1;
    taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY));
  }
395
  ASSERT(mapSize == taosHashGetSize(pInfo->pMap));
5
54liuyao 已提交
396 397 398 399 400 401 402 403 404 405 406

  if (tDecodeI64(&decoder, &pInfo->scanWindow.skey) < 0) return -1;
  if (tDecodeI64(&decoder, &pInfo->scanWindow.ekey) < 0) return -1;
  if (tDecodeU64(&decoder, &pInfo->scanGroupId) < 0) return -1;
  if (tDecodeU64(&decoder, &pInfo->maxVersion) < 0) return -1;

  tEndDecode(&decoder);

  tDecoderClear(&decoder);
  return 0;
}