streamState.c 25.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executor.h"
#include "streamInc.h"
18
#include "tcommon.h"
19
#include "tcompare.h"
20 21
#include "ttimer.h"

22 23 24 25 26 27
// todo refactor
typedef struct SStateKey {
  SWinKey key;
  int64_t opNum;
} SStateKey;

5
54liuyao 已提交
28 29 30 31 32
typedef struct SStateSessionKey {
  SSessionKey key;
  int64_t     opNum;
} SStateSessionKey;

33
static inline int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
5
54liuyao 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
static inline int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  if (pWin1->win.ekey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
    return -1;
  }

  return 0;
}

5
54liuyao 已提交
71 72 73 74 75 76 77 78 79 80
static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

81
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
5
54liuyao 已提交
82 83
}

84
static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
  SStateKey* pWin1 = (SStateKey*)pKey1;
  SStateKey* pWin2 = (SStateKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

  if (pWin1->key.ts > pWin2->key.ts) {
    return 1;
  } else if (pWin1->key.ts < pWin2->key.ts) {
    return -1;
  }

  if (pWin1->key.groupId > pWin2->key.groupId) {
    return 1;
  } else if (pWin1->key.groupId < pWin2->key.groupId) {
    return -1;
  }

  return 0;
}

109 110 111
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
  szPage = szPage < 0 ? 4096 : szPage;
  pages = pages < 0 ? 256 : pages;
112 113 114 115 116
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
  if (pState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
5
54liuyao 已提交
117 118 119 120 121 122
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
  if (pState->pTdbState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    streamStateDestroy(pState);
    return NULL;
  }
L
Liu Jicong 已提交
123

124
  char statePath[1024];
L
Liu Jicong 已提交
125 126 127
  if (!specPath) {
    sprintf(statePath, "%s/%d", path, pTask->taskId);
  } else {
128 129
    memset(statePath, 0, 1024);
    tstrncpy(statePath, path, 1024);
L
Liu Jicong 已提交
130
  }
5
54liuyao 已提交
131
  if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 0) < 0) {
132 133 134 135
    goto _err;
  }

  // open state storage backend
5
54liuyao 已提交
136 137
  if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
                0) < 0) {
138 139 140
    goto _err;
  }

5
54liuyao 已提交
141
  // todo refactor
5
54liuyao 已提交
142 143
  if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFillStateDb, 0) < 0) {
5
54liuyao 已提交
144 145 146
    goto _err;
  }

5
54liuyao 已提交
147 148
  if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pSessionStateDb, 0) < 0) {
5
54liuyao 已提交
149 150 151
    goto _err;
  }

5
54liuyao 已提交
152 153
  if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFuncStateDb, 0) < 0) {
154 155 156
    goto _err;
  }

5
54liuyao 已提交
157 158
  if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
                &pState->pTdbState->pParNameDb, 0) < 0) {
159 160 161
    goto _err;
  }

162
  if (streamStateBegin(pState) < 0) {
163 164 165
    goto _err;
  }

5
54liuyao 已提交
166
  pState->pTdbState->pOwner = pTask;
167 168 169 170

  return pState;

_err:
5
54liuyao 已提交
171 172 173 174 175 176 177
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
  tdbClose(pState->pTdbState->db);
  streamStateDestroy(pState);
178 179 180 181
  return NULL;
}

void streamStateClose(SStreamState* pState) {
182 183
  tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
  tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
5
54liuyao 已提交
184 185 186 187 188 189
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
  tdbClose(pState->pTdbState->db);
190

5
54liuyao 已提交
191
  streamStateDestroy(pState);
192 193 194
}

int32_t streamStateBegin(SStreamState* pState) {
195
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
196
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
197
    tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
198 199 200 201 202 203
    return -1;
  }
  return 0;
}

int32_t streamStateCommit(SStreamState* pState) {
204
  if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
M
Minglei Jin 已提交
205 206
    return -1;
  }
207
  if (tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
208 209
    return -1;
  }
210

211
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
212
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
213 214 215 216 217 218
    return -1;
  }
  return 0;
}

int32_t streamStateAbort(SStreamState* pState) {
219
  if (tdbAbort(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
220 221
    return -1;
  }
222

223
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
224
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
225 226 227 228 229
    return -1;
  }
  return 0;
}

230
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
231
  return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
232 233
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
5
54liuyao 已提交
234
  return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
235 236 237
}

int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
238
  return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn);
239 240
}

241
// todo refactor
242
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
243
  SStateKey sKey = {.key = *key, .opNum = pState->number};
244
  return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
245
}
5
54liuyao 已提交
246 247 248

// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
249
  return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
5
54liuyao 已提交
250 251
}

252
// todo refactor
253
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
254
  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
255
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
256 257
}

5
54liuyao 已提交
258 259
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
5
54liuyao 已提交
260
  return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
5
54liuyao 已提交
261 262
}

263
// todo refactor
264
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
265
  SStateKey sKey = {.key = *key, .opNum = pState->number};
266
  return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
267 268
}

269 270 271 272 273 274 275
int32_t streamStateClear(SStreamState* pState) {
  SWinKey key = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &key, NULL, 0);
  while (1) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
    SWinKey          delKey = {0};
    int32_t          code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
5
54liuyao 已提交
276
    streamStateFreeCur(pCur);
277 278 279 280 281 282 283 284 285 286 287
    if (code == 0) {
      streamStateDel(pState, &delKey);
    } else {
      break;
    }
  }
  return 0;
}

void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }

5
54liuyao 已提交
288 289
// todo refactor
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
290
  return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
5
54liuyao 已提交
291 292
}

293 294 295 296 297 298
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
  // todo refactor
  int32_t size = *pVLen;
  if (streamStateGet(pState, key, pVal, pVLen) == 0) {
    return 0;
  }
5
54liuyao 已提交
299 300 301
  *pVal = tdbRealloc(NULL, size);
  memset(*pVal, 0, size);
  return 0;
302 303 304 305
}

int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
  // todo refactor
5
54liuyao 已提交
306 307 308
  if (!pVal) {
    return 0;
  }
309 310 311 312
  streamFreeVal(pVal);
  return 0;
}

313
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
314 315
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
316
  tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL);
317

5
54liuyao 已提交
318
  int32_t   c = 0;
319
  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
320
  tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c);
321
  if (c != 0) {
5
54liuyao 已提交
322
    streamStateFreeCur(pCur);
323 324
    return NULL;
  }
325
  pCur->number = pState->number;
326
  return pCur;
327 328
}

5
54liuyao 已提交
329 330 331
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
332
  tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
5
54liuyao 已提交
333

5
54liuyao 已提交
334
  int32_t c = 0;
5
54liuyao 已提交
335 336
  tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
  if (c != 0) {
5
54liuyao 已提交
337
    streamStateFreeCur(pCur);
5
54liuyao 已提交
338 339 340 341 342 343 344 345 346 347 348 349
    return NULL;
  }
  return pCur;
}

SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
  SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
    if (code == 0) {
      return pCur;
    }
5
54liuyao 已提交
350
    streamStateFreeCur(pCur);
5
54liuyao 已提交
351 352 353 354
  }
  return NULL;
}

355
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
  if (!pCur) {
    return -1;
  }
  const SStateKey* pKTmp = NULL;
  int32_t          kLen;
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
}

int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
  if (!pCur) {
    return -1;
  }
375 376 377 378 379 380 381
  const SWinKey* pKTmp = NULL;
  int32_t        kLen;
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  *pKey = *pKTmp;
  return 0;
382 383
}

5
54liuyao 已提交
384
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
5
54liuyao 已提交
385 386 387
  if (!pCur) {
    return -1;
  }
5
54liuyao 已提交
388
  uint64_t groupId = pKey->groupId;
389
  int32_t  code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
5
54liuyao 已提交
390 391 392 393 394 395 396 397
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
  }
  return -1;
}

398 399 400 401 402 403
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
  // todo refactor
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
  int32_t          code = streamStateGetKVByCur(pCur, key, NULL, 0);
M
Minglei Jin 已提交
404
  streamStateFreeCur(pCur);
405 406 407 408
  streamStateDel(pState, &tmp);
  return code;
}

409 410 411 412 413 414 415 416 417 418
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
  //
  return tdbTbcMoveToFirst(pCur->pCur);
}

int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
  //
  return tdbTbcMoveToLast(pCur->pCur);
}

419 420 421 422 423 424
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
425
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
426
    streamStateFreeCur(pCur);
427 428 429 430
    return NULL;
  }

  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
431
  int32_t   c = 0;
432
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
5
54liuyao 已提交
433
    streamStateFreeCur(pCur);
434 435 436 437 438
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
439
    streamStateFreeCur(pCur);
440 441 442 443 444 445
    return NULL;
  }

  return pCur;
}

5
54liuyao 已提交
446
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
447
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
5
54liuyao 已提交
448
  if (!pCur) {
449 450
    return NULL;
  }
5
54liuyao 已提交
451
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
452
    streamStateFreeCur(pCur);
5
54liuyao 已提交
453 454
    return NULL;
  }
455

5
54liuyao 已提交
456
  int32_t c = 0;
457
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
458
    streamStateFreeCur(pCur);
459 460 461 462 463
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
464
    streamStateFreeCur(pCur);
465 466 467 468 469 470
    return NULL;
  }

  return pCur;
}

5
54liuyao 已提交
471
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
472 473 474 475
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
5
54liuyao 已提交
476
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
477
    streamStateFreeCur(pCur);
5
54liuyao 已提交
478 479
    return NULL;
  }
480

5
54liuyao 已提交
481
  int32_t c = 0;
482
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
483
    streamStateFreeCur(pCur);
484 485 486 487 488
    return NULL;
  }
  if (c < 0) return pCur;

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
5
54liuyao 已提交
489
    streamStateFreeCur(pCur);
490 491 492 493 494 495 496
    return NULL;
  }

  return pCur;
}

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
5
54liuyao 已提交
497 498 499
  if (!pCur) {
    return -1;
  }
500 501 502 503 504 505
  //
  return tdbTbcMoveToNext(pCur->pCur);
}

int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
  //
506 507 508
  if (!pCur) {
    return -1;
  }
509 510
  return tdbTbcMoveToPrev(pCur->pCur);
}
511
void streamStateFreeCur(SStreamStateCur* pCur) {
512 513 514
  if (!pCur) {
    return;
  }
515 516 517 518 519
  tdbTbcClose(pCur->pCur);
  taosMemoryFree(pCur);
}

void streamFreeVal(void* val) { tdbFree(val); }
5
54liuyao 已提交
520 521 522

int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
523
  return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
524
                     pState->pTdbState->txn);
5
54liuyao 已提交
525 526 527
}

int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
528 529
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
  SSessionKey      resKey = *key;
5
54liuyao 已提交
530
  void*            tmp = NULL;
531 532
  int32_t          code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
  if (code == 0) {
5
54liuyao 已提交
533 534 535 536 537 538 539
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = tdbRealloc(NULL, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
    }
5
54liuyao 已提交
540 541
  }
  streamStateFreeCur(pCur);
542
  return code;
5
54liuyao 已提交
543 544 545 546
}

int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
547
  return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
5
54liuyao 已提交
548 549
}

550
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
5
54liuyao 已提交
551 552 553 554 555
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
556
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
557 558 559 560 561 562 563 564 565 566
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  int32_t          c = 0;
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
567
  if (c >= 0) return pCur;
5
54liuyao 已提交
568 569 570 571 572 573 574 575 576

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
}

577 578 579 580 581 582
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
583
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  int32_t          c = 0;
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  if (c <= 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
}

5
54liuyao 已提交
605 606 607 608 609 610
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
611
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
612 613 614 615 616 617 618 619 620 621
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  int32_t          c = 0;
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
622
  if (c < 0) return pCur;
5
54liuyao 已提交
623 624 625 626 627 628 629 630 631

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
}

632
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
5
54liuyao 已提交
633 634 635
  if (!pCur) {
    return -1;
  }
636 637 638
  SStateSessionKey* pKTmp = NULL;
  int32_t           kLen;
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
5
54liuyao 已提交
639 640 641 642 643 644 645 646 647 648 649 650 651
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
}

int32_t streamStateSessionClear(SStreamState* pState) {
652 653
  SSessionKey      key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
5
54liuyao 已提交
654 655 656 657
  while (1) {
    SSessionKey delKey = {0};
    void*       buf = NULL;
    int32_t     size = 0;
658
    int32_t     code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
5
54liuyao 已提交
659
    if (code == 0 && size > 0) {
5
54liuyao 已提交
660 661 662 663 664 665 666 667 668 669 670
      memset(buf, 0, size);
      streamStateSessionPut(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return 0;
}

671 672 673 674 675 676
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
677
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
678 679
    streamStateFreeCur(pCur);
    return -1;
5
54liuyao 已提交
680 681
  }

682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  int32_t          c = 0;
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return -1;
  }

  SSessionKey resKey = *key;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
712 713
    }
  }
714

715
  streamStateFreeCur(pCur);
716
  return -1;
717 718
}

719 720
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                        int32_t* pVLen) {
5
54liuyao 已提交
721
  // todo refactor
722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743
  int32_t     res = 0;
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;
  void*   tmp = tdbRealloc(NULL, valSize);
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
  int32_t          code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
    streamStateCurNext(pState, pCur);
  } else {
    *key = originKey;
5
54liuyao 已提交
744
    streamStateFreeCur(pCur);
745
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
746
  }
747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763

  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
5
54liuyao 已提交
764
  streamStateFreeCur(pCur);
765
  return res;
5
54liuyao 已提交
766 767 768 769 770
}

int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
  // todo refactor
771
  int32_t     res = 0;
5
54liuyao 已提交
772 773 774 775 776 777 778
  SSessionKey tmpKey = *key;
  int32_t     valSize = *pVLen;
  void*       tmp = tdbRealloc(NULL, valSize);
  if (!tmp) {
    return -1;
  }

779
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
780
  int32_t          code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
781 782 783
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
784
      streamStateSessionDel(pState, key);
785 786
      goto _end;
    }
5
54liuyao 已提交
787 788 789 790

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
791
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
792 793
      goto _end;
    }
5
54liuyao 已提交
794 795 796 797 798 799

    streamStateCurNext(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
800 801
  }

802
  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
803
  if (code == 0) {
5
54liuyao 已提交
804 805 806
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
807
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
808 809 810 811 812 813 814 815 816 817 818 819 820 821
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
}
5
54liuyao 已提交
822

823
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
5
54liuyao 已提交
824
  tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
825
              pState->pTdbState->txn);
826 827 828 829 830
  return 0;
}

int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
  int32_t len;
5
54liuyao 已提交
831 832 833 834 835 836
  return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
}

void streamStateDestroy(SStreamState* pState) {
  taosMemoryFreeClear(pState->pTdbState);
  taosMemoryFreeClear(pState);
837 838
}

5
54liuyao 已提交
839 840 841 842 843 844 845
#if 0
char* streamStateSessionDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
846
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
847 848 849 850 851 852
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SSessionKey key = {0};
853 854 855
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
5
54liuyao 已提交
856
  if (code != 0) {
857
    streamStateFreeCur(pCur);
5
54liuyao 已提交
858 859 860 861 862 863 864 865 866 867 868 869 870 871
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
  len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SSessionKey){0};
    code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
872
      streamStateFreeCur(pCur);
5
54liuyao 已提交
873 874 875 876 877 878
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
    len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
879
  streamStateFreeCur(pCur);
5
54liuyao 已提交
880 881 882
  return dumpBuf;
}
#endif