streamState.c 25.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executor.h"
#include "streamInc.h"
18
#include "tcommon.h"
19
#include "tcompare.h"
20 21
#include "ttimer.h"

22 23 24 25 26 27
// todo refactor
typedef struct SStateKey {
  SWinKey key;
  int64_t opNum;
} SStateKey;

5
54liuyao 已提交
28 29 30 31 32
typedef struct SStateSessionKey {
  SSessionKey key;
  int64_t     opNum;
} SStateSessionKey;

33
static inline int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
5
54liuyao 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
static inline int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  if (pWin1->win.ekey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
    return -1;
  }

  return 0;
}

5
54liuyao 已提交
71 72 73 74 75 76 77 78 79 80
static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

81
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
5
54liuyao 已提交
82 83
}

84
static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
  SStateKey* pWin1 = (SStateKey*)pKey1;
  SStateKey* pWin2 = (SStateKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

  if (pWin1->key.ts > pWin2->key.ts) {
    return 1;
  } else if (pWin1->key.ts < pWin2->key.ts) {
    return -1;
  }

  if (pWin1->key.groupId > pWin2->key.groupId) {
    return 1;
  } else if (pWin1->key.groupId < pWin2->key.groupId) {
    return -1;
  }

  return 0;
}

109 110 111
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
  szPage = szPage < 0 ? 4096 : szPage;
  pages = pages < 0 ? 256 : pages;
112 113 114 115 116
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
  if (pState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
5
54liuyao 已提交
117 118 119 120 121 122
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
  if (pState->pTdbState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    streamStateDestroy(pState);
    return NULL;
  }
L
Liu Jicong 已提交
123

124
  char statePath[1024];
L
Liu Jicong 已提交
125 126 127
  if (!specPath) {
    sprintf(statePath, "%s/%d", path, pTask->taskId);
  } else {
128 129
    memset(statePath, 0, 1024);
    tstrncpy(statePath, path, 1024);
L
Liu Jicong 已提交
130
  }
131
  if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) {
132 133 134 135
    goto _err;
  }

  // open state storage backend
5
54liuyao 已提交
136 137
  if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
                0) < 0) {
138 139 140
    goto _err;
  }

5
54liuyao 已提交
141
  // todo refactor
5
54liuyao 已提交
142 143
  if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFillStateDb, 0) < 0) {
5
54liuyao 已提交
144 145 146
    goto _err;
  }

5
54liuyao 已提交
147 148
  if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pSessionStateDb, 0) < 0) {
5
54liuyao 已提交
149 150 151
    goto _err;
  }

5
54liuyao 已提交
152 153
  if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFuncStateDb, 0) < 0) {
154 155 156
    goto _err;
  }

5
54liuyao 已提交
157 158
  if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
                &pState->pTdbState->pParNameDb, 0) < 0) {
159 160 161
    goto _err;
  }

L
Liu Jicong 已提交
162 163 164 165 166
  if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) <
      0) {
    goto _err;
  }

167
  if (streamStateBegin(pState) < 0) {
168 169 170
    goto _err;
  }

5
54liuyao 已提交
171
  pState->pTdbState->pOwner = pTask;
172 173 174 175

  return pState;

_err:
5
54liuyao 已提交
176 177 178 179 180
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
181
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
182 183
  tdbClose(pState->pTdbState->db);
  streamStateDestroy(pState);
184 185 186 187
  return NULL;
}

void streamStateClose(SStreamState* pState) {
188 189
  tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
  tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
5
54liuyao 已提交
190 191 192 193 194
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
195
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
196
  tdbClose(pState->pTdbState->db);
197

5
54liuyao 已提交
198
  streamStateDestroy(pState);
199 200 201
}

int32_t streamStateBegin(SStreamState* pState) {
202
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
203
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
204
    tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
205 206 207 208 209 210
    return -1;
  }
  return 0;
}

int32_t streamStateCommit(SStreamState* pState) {
211
  if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
M
Minglei Jin 已提交
212 213
    return -1;
  }
214
  if (tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
215 216
    return -1;
  }
217

218
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
219
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
220 221 222 223 224 225
    return -1;
  }
  return 0;
}

int32_t streamStateAbort(SStreamState* pState) {
226
  if (tdbAbort(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
227 228
    return -1;
  }
229

230
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
231
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
232 233 234 235 236
    return -1;
  }
  return 0;
}

237
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
238
  return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
239 240
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
5
54liuyao 已提交
241
  return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
242 243 244
}

int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
245
  return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn);
246 247
}

248
// todo refactor
249
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
250
  SStateKey sKey = {.key = *key, .opNum = pState->number};
251
  return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
252
}
5
54liuyao 已提交
253 254 255

// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
256
  return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
5
54liuyao 已提交
257 258
}

259
// todo refactor
260
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
261
  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
262
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
263 264
}

5
54liuyao 已提交
265 266
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
5
54liuyao 已提交
267
  return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
5
54liuyao 已提交
268 269
}

270
// todo refactor
271
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
272
  SStateKey sKey = {.key = *key, .opNum = pState->number};
273
  return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
274 275
}

276 277 278 279 280 281 282
int32_t streamStateClear(SStreamState* pState) {
  SWinKey key = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &key, NULL, 0);
  while (1) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
    SWinKey          delKey = {0};
    int32_t          code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
5
54liuyao 已提交
283
    streamStateFreeCur(pCur);
284 285 286 287 288 289 290 291 292 293 294
    if (code == 0) {
      streamStateDel(pState, &delKey);
    } else {
      break;
    }
  }
  return 0;
}

void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }

5
54liuyao 已提交
295 296
// todo refactor
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
297
  return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
5
54liuyao 已提交
298 299
}

300 301 302 303 304 305
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
  // todo refactor
  int32_t size = *pVLen;
  if (streamStateGet(pState, key, pVal, pVLen) == 0) {
    return 0;
  }
5
54liuyao 已提交
306 307 308
  *pVal = tdbRealloc(NULL, size);
  memset(*pVal, 0, size);
  return 0;
309 310 311 312
}

int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
  // todo refactor
5
54liuyao 已提交
313 314 315
  if (!pVal) {
    return 0;
  }
316 317 318 319
  streamFreeVal(pVal);
  return 0;
}

320
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
321 322
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
323
  tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL);
324

5
54liuyao 已提交
325
  int32_t   c = 0;
326
  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
327
  tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c);
328
  if (c != 0) {
5
54liuyao 已提交
329
    streamStateFreeCur(pCur);
330 331
    return NULL;
  }
332
  pCur->number = pState->number;
333
  return pCur;
334 335
}

5
54liuyao 已提交
336 337 338
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
339
  tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
5
54liuyao 已提交
340

5
54liuyao 已提交
341
  int32_t c = 0;
5
54liuyao 已提交
342 343
  tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
  if (c != 0) {
5
54liuyao 已提交
344
    streamStateFreeCur(pCur);
5
54liuyao 已提交
345 346 347 348 349 350 351 352 353 354 355 356
    return NULL;
  }
  return pCur;
}

SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
  SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
    if (code == 0) {
      return pCur;
    }
5
54liuyao 已提交
357
    streamStateFreeCur(pCur);
5
54liuyao 已提交
358 359 360 361
  }
  return NULL;
}

362
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
  if (!pCur) {
    return -1;
  }
  const SStateKey* pKTmp = NULL;
  int32_t          kLen;
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
}

int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
  if (!pCur) {
    return -1;
  }
382 383 384 385 386 387 388
  const SWinKey* pKTmp = NULL;
  int32_t        kLen;
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  *pKey = *pKTmp;
  return 0;
389 390
}

5
54liuyao 已提交
391
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
5
54liuyao 已提交
392 393 394
  if (!pCur) {
    return -1;
  }
5
54liuyao 已提交
395
  uint64_t groupId = pKey->groupId;
396
  int32_t  code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
5
54liuyao 已提交
397 398 399 400 401 402 403 404
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
  }
  return -1;
}

405 406 407 408 409 410
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
  // todo refactor
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
  int32_t          code = streamStateGetKVByCur(pCur, key, NULL, 0);
M
Minglei Jin 已提交
411
  streamStateFreeCur(pCur);
412 413 414 415
  streamStateDel(pState, &tmp);
  return code;
}

416 417 418 419 420 421 422 423 424 425
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
  //
  return tdbTbcMoveToFirst(pCur->pCur);
}

int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
  //
  return tdbTbcMoveToLast(pCur->pCur);
}

426 427 428 429 430 431
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
432
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
433
    streamStateFreeCur(pCur);
434 435 436 437
    return NULL;
  }

  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
438
  int32_t   c = 0;
439
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
5
54liuyao 已提交
440
    streamStateFreeCur(pCur);
441 442 443 444 445
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
446
    streamStateFreeCur(pCur);
447 448 449 450 451 452
    return NULL;
  }

  return pCur;
}

5
54liuyao 已提交
453
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
454
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
5
54liuyao 已提交
455
  if (!pCur) {
456 457
    return NULL;
  }
5
54liuyao 已提交
458
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
459
    streamStateFreeCur(pCur);
5
54liuyao 已提交
460 461
    return NULL;
  }
462

5
54liuyao 已提交
463
  int32_t c = 0;
464
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
465
    streamStateFreeCur(pCur);
466 467 468 469 470
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
471
    streamStateFreeCur(pCur);
472 473 474 475 476 477
    return NULL;
  }

  return pCur;
}

5
54liuyao 已提交
478
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
479 480 481 482
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
5
54liuyao 已提交
483
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
484
    streamStateFreeCur(pCur);
5
54liuyao 已提交
485 486
    return NULL;
  }
487

5
54liuyao 已提交
488
  int32_t c = 0;
489
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
490
    streamStateFreeCur(pCur);
491 492 493 494 495
    return NULL;
  }
  if (c < 0) return pCur;

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
5
54liuyao 已提交
496
    streamStateFreeCur(pCur);
497 498 499 500 501 502 503
    return NULL;
  }

  return pCur;
}

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
5
54liuyao 已提交
504 505 506
  if (!pCur) {
    return -1;
  }
507 508 509 510 511 512
  //
  return tdbTbcMoveToNext(pCur->pCur);
}

int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
  //
513 514 515
  if (!pCur) {
    return -1;
  }
516 517
  return tdbTbcMoveToPrev(pCur->pCur);
}
518
void streamStateFreeCur(SStreamStateCur* pCur) {
519 520 521
  if (!pCur) {
    return;
  }
522 523 524 525 526
  tdbTbcClose(pCur->pCur);
  taosMemoryFree(pCur);
}

void streamFreeVal(void* val) { tdbFree(val); }
5
54liuyao 已提交
527 528 529

int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
530
  return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
531
                     pState->pTdbState->txn);
5
54liuyao 已提交
532 533 534
}

int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
535 536
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
  SSessionKey      resKey = *key;
5
54liuyao 已提交
537
  void*            tmp = NULL;
538 539
  int32_t          code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
  if (code == 0) {
5
54liuyao 已提交
540 541 542 543 544 545 546
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = tdbRealloc(NULL, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
    }
5
54liuyao 已提交
547 548
  }
  streamStateFreeCur(pCur);
549
  return code;
5
54liuyao 已提交
550 551 552 553
}

int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
554
  return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
5
54liuyao 已提交
555 556
}

557
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
5
54liuyao 已提交
558 559 560 561 562
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
563
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
564 565 566 567 568 569 570 571 572 573
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  int32_t          c = 0;
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
574
  if (c >= 0) return pCur;
5
54liuyao 已提交
575 576 577 578 579 580 581 582 583

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
}

584 585 586 587 588 589
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
590
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  int32_t          c = 0;
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  if (c <= 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
}

5
54liuyao 已提交
612 613 614 615 616 617
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
618
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
619 620 621 622 623 624 625 626 627 628
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  int32_t          c = 0;
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
629
  if (c < 0) return pCur;
5
54liuyao 已提交
630 631 632 633 634 635 636 637 638

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
}

639
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
5
54liuyao 已提交
640 641 642
  if (!pCur) {
    return -1;
  }
643 644 645
  SStateSessionKey* pKTmp = NULL;
  int32_t           kLen;
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
5
54liuyao 已提交
646 647 648 649 650 651 652 653 654 655 656 657 658
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
}

int32_t streamStateSessionClear(SStreamState* pState) {
659 660
  SSessionKey      key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
5
54liuyao 已提交
661 662 663 664
  while (1) {
    SSessionKey delKey = {0};
    void*       buf = NULL;
    int32_t     size = 0;
665
    int32_t     code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
5
54liuyao 已提交
666
    if (code == 0 && size > 0) {
5
54liuyao 已提交
667 668 669 670 671 672 673 674 675 676 677
      memset(buf, 0, size);
      streamStateSessionPut(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return 0;
}

678 679 680 681 682 683
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
684
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
685 686
    streamStateFreeCur(pCur);
    return -1;
5
54liuyao 已提交
687 688
  }

689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  int32_t          c = 0;
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return -1;
  }

  SSessionKey resKey = *key;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
719 720
    }
  }
721

722
  streamStateFreeCur(pCur);
723
  return -1;
724 725
}

726 727
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                        int32_t* pVLen) {
5
54liuyao 已提交
728
  // todo refactor
729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750
  int32_t     res = 0;
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;
  void*   tmp = tdbRealloc(NULL, valSize);
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
  int32_t          code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
    streamStateCurNext(pState, pCur);
  } else {
    *key = originKey;
5
54liuyao 已提交
751
    streamStateFreeCur(pCur);
752
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
753
  }
754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770

  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
5
54liuyao 已提交
771
  streamStateFreeCur(pCur);
772
  return res;
5
54liuyao 已提交
773 774 775 776 777
}

int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
  // todo refactor
778
  int32_t     res = 0;
5
54liuyao 已提交
779 780 781 782 783 784 785
  SSessionKey tmpKey = *key;
  int32_t     valSize = *pVLen;
  void*       tmp = tdbRealloc(NULL, valSize);
  if (!tmp) {
    return -1;
  }

786
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
787
  int32_t          code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
788 789 790
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
791
      streamStateSessionDel(pState, key);
792 793
      goto _end;
    }
5
54liuyao 已提交
794 795 796 797

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
798
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
799 800
      goto _end;
    }
5
54liuyao 已提交
801 802 803 804 805 806

    streamStateCurNext(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
807 808
  }

809
  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
810
  if (code == 0) {
5
54liuyao 已提交
811 812 813
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
814
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
815 816 817 818 819 820 821 822 823 824 825 826 827 828
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
}
5
54liuyao 已提交
829

L
Liu Jicong 已提交
830 831 832 833 834 835 836 837
int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
  return tdbTbUpsert(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tag, tagLen, pState->pTdbState->txn);
}

int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
  return tdbTbGet(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tagVal, tagLen);
}

838
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
L
Liu Jicong 已提交
839 840
  return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
                     pState->pTdbState->txn);
841 842 843 844
}

int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
  int32_t len;
5
54liuyao 已提交
845 846 847 848 849 850
  return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
}

void streamStateDestroy(SStreamState* pState) {
  taosMemoryFreeClear(pState->pTdbState);
  taosMemoryFreeClear(pState);
851 852
}

5
54liuyao 已提交
853 854 855 856 857 858 859
#if 0
char* streamStateSessionDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
860
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
861 862 863 864 865 866
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SSessionKey key = {0};
867 868 869
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
5
54liuyao 已提交
870
  if (code != 0) {
871
    streamStateFreeCur(pCur);
5
54liuyao 已提交
872 873 874 875 876 877 878 879 880 881 882 883 884 885
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
  len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SSessionKey){0};
    code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
886
      streamStateFreeCur(pCur);
5
54liuyao 已提交
887 888 889 890 891 892
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
    len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
893
  streamStateFreeCur(pCur);
5
54liuyao 已提交
894 895 896
  return dumpBuf;
}
#endif