streamState.c 25.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executor.h"
#include "streamInc.h"
18
#include "tcommon.h"
19
#include "tcompare.h"
20 21
#include "ttimer.h"

22 23 24 25 26 27
// todo refactor
typedef struct SStateKey {
  SWinKey key;
  int64_t opNum;
} SStateKey;

5
54liuyao 已提交
28 29 30 31 32
typedef struct SStateSessionKey {
  SSessionKey key;
  int64_t     opNum;
} SStateSessionKey;

33
static inline int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
5
54liuyao 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
static inline int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  if (pWin1->win.ekey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
    return -1;
  }

  return 0;
}

5
54liuyao 已提交
71 72 73 74 75 76 77 78 79 80
static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

81
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
5
54liuyao 已提交
82 83
}

84
static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
  SStateKey* pWin1 = (SStateKey*)pKey1;
  SStateKey* pWin2 = (SStateKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

  if (pWin1->key.ts > pWin2->key.ts) {
    return 1;
  } else if (pWin1->key.ts < pWin2->key.ts) {
    return -1;
  }

  if (pWin1->key.groupId > pWin2->key.groupId) {
    return 1;
  } else if (pWin1->key.groupId < pWin2->key.groupId) {
    return -1;
  }

  return 0;
}

109 110 111
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
  szPage = szPage < 0 ? 4096 : szPage;
  pages = pages < 0 ? 256 : pages;
112 113 114 115 116
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
  if (pState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
5
54liuyao 已提交
117 118 119 120 121 122
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
  if (pState->pTdbState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    streamStateDestroy(pState);
    return NULL;
  }
L
Liu Jicong 已提交
123

124
  char statePath[1024];
L
Liu Jicong 已提交
125 126 127
  if (!specPath) {
    sprintf(statePath, "%s/%d", path, pTask->taskId);
  } else {
128 129
    memset(statePath, 0, 1024);
    tstrncpy(statePath, path, 1024);
L
Liu Jicong 已提交
130
  }
5
54liuyao 已提交
131
  if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 0) < 0) {
132 133 134 135
    goto _err;
  }

  // open state storage backend
5
54liuyao 已提交
136 137
  if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
                0) < 0) {
138 139 140
    goto _err;
  }

5
54liuyao 已提交
141
  // todo refactor
5
54liuyao 已提交
142 143
  if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFillStateDb, 0) < 0) {
5
54liuyao 已提交
144 145 146
    goto _err;
  }

5
54liuyao 已提交
147 148
  if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pSessionStateDb, 0) < 0) {
5
54liuyao 已提交
149 150 151
    goto _err;
  }

5
54liuyao 已提交
152 153
  if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFuncStateDb, 0) < 0) {
154 155 156
    goto _err;
  }

5
54liuyao 已提交
157 158
  if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
                &pState->pTdbState->pParNameDb, 0) < 0) {
159 160 161
    goto _err;
  }

162
  if (streamStateBegin(pState) < 0) {
163 164 165
    goto _err;
  }

5
54liuyao 已提交
166
  pState->pTdbState->pOwner = pTask;
167 168 169 170

  return pState;

_err:
5
54liuyao 已提交
171 172 173 174 175 176 177
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
  tdbClose(pState->pTdbState->db);
  streamStateDestroy(pState);
178 179 180 181
  return NULL;
}

void streamStateClose(SStreamState* pState) {
5
54liuyao 已提交
182 183 184 185 186 187 188 189
  tdbCommit(pState->pTdbState->db, &pState->pTdbState->txn);
  tdbPostCommit(pState->pTdbState->db, &pState->pTdbState->txn);
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
  tdbClose(pState->pTdbState->db);
190

5
54liuyao 已提交
191
  streamStateDestroy(pState);
192 193 194
}

int32_t streamStateBegin(SStreamState* pState) {
5
54liuyao 已提交
195 196
  if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL,
                 TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
197 198 199
    return -1;
  }

5
54liuyao 已提交
200 201
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
    tdbTxnClose(&pState->pTdbState->txn);
202 203 204 205 206 207
    return -1;
  }
  return 0;
}

int32_t streamStateCommit(SStreamState* pState) {
5
54liuyao 已提交
208
  if (tdbCommit(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
209 210
    return -1;
  }
5
54liuyao 已提交
211
  if (tdbPostCommit(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
M
Minglei Jin 已提交
212 213
    return -1;
  }
5
54liuyao 已提交
214 215 216
  memset(&pState->pTdbState->txn, 0, sizeof(TXN));
  if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL,
                 TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
217 218
    return -1;
  }
5
54liuyao 已提交
219
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
220 221 222 223 224 225
    return -1;
  }
  return 0;
}

int32_t streamStateAbort(SStreamState* pState) {
5
54liuyao 已提交
226
  if (tdbAbort(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
227 228
    return -1;
  }
5
54liuyao 已提交
229 230 231
  memset(&pState->pTdbState->txn, 0, sizeof(TXN));
  if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL,
                 TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
232 233
    return -1;
  }
5
54liuyao 已提交
234
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
235 236 237 238 239
    return -1;
  }
  return 0;
}

240
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
5
54liuyao 已提交
241
  return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, &pState->pTdbState->txn);
242 243
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
5
54liuyao 已提交
244
  return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
245 246 247
}

int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
5
54liuyao 已提交
248
  return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), &pState->pTdbState->txn);
249 250
}

251
// todo refactor
252
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
253
  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
254
  return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, &pState->pTdbState->txn);
255
}
5
54liuyao 已提交
256 257 258

// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
5
54liuyao 已提交
259
  return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, &pState->pTdbState->txn);
5
54liuyao 已提交
260 261
}

262
// todo refactor
263
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
264
  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
265
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
266 267
}

5
54liuyao 已提交
268 269
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
5
54liuyao 已提交
270
  return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
5
54liuyao 已提交
271 272
}

273
// todo refactor
274
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
275
  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
276
  return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), &pState->pTdbState->txn);
277 278
}

279 280 281 282 283 284 285
int32_t streamStateClear(SStreamState* pState) {
  SWinKey key = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &key, NULL, 0);
  while (1) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
    SWinKey          delKey = {0};
    int32_t          code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
5
54liuyao 已提交
286
    streamStateFreeCur(pCur);
287 288 289 290 291 292 293 294 295 296 297
    if (code == 0) {
      streamStateDel(pState, &delKey);
    } else {
      break;
    }
  }
  return 0;
}

void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }

5
54liuyao 已提交
298 299
// todo refactor
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
5
54liuyao 已提交
300
  return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), &pState->pTdbState->txn);
5
54liuyao 已提交
301 302
}

303 304 305 306 307 308
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
  // todo refactor
  int32_t size = *pVLen;
  if (streamStateGet(pState, key, pVal, pVLen) == 0) {
    return 0;
  }
5
54liuyao 已提交
309 310 311
  *pVal = tdbRealloc(NULL, size);
  memset(*pVal, 0, size);
  return 0;
312 313 314 315
}

int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
  // todo refactor
5
54liuyao 已提交
316 317 318
  if (!pVal) {
    return 0;
  }
319 320 321 322
  streamFreeVal(pVal);
  return 0;
}

323
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
324 325
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
326
  tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL);
327

5
54liuyao 已提交
328
  int32_t   c = 0;
329
  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
330
  tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c);
331
  if (c != 0) {
5
54liuyao 已提交
332
    streamStateFreeCur(pCur);
333 334
    return NULL;
  }
335
  pCur->number = pState->number;
336
  return pCur;
337 338
}

5
54liuyao 已提交
339 340 341
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
342
  tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
5
54liuyao 已提交
343

5
54liuyao 已提交
344
  int32_t c = 0;
5
54liuyao 已提交
345 346
  tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
  if (c != 0) {
5
54liuyao 已提交
347
    streamStateFreeCur(pCur);
5
54liuyao 已提交
348 349 350 351 352 353 354 355 356 357 358 359
    return NULL;
  }
  return pCur;
}

SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
  SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
    if (code == 0) {
      return pCur;
    }
5
54liuyao 已提交
360
    streamStateFreeCur(pCur);
5
54liuyao 已提交
361 362 363 364
  }
  return NULL;
}

365
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
  if (!pCur) {
    return -1;
  }
  const SStateKey* pKTmp = NULL;
  int32_t          kLen;
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
}

int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
  if (!pCur) {
    return -1;
  }
385 386 387 388 389 390 391
  const SWinKey* pKTmp = NULL;
  int32_t        kLen;
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  *pKey = *pKTmp;
  return 0;
392 393
}

5
54liuyao 已提交
394
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
5
54liuyao 已提交
395 396 397
  if (!pCur) {
    return -1;
  }
5
54liuyao 已提交
398
  uint64_t groupId = pKey->groupId;
399
  int32_t  code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
5
54liuyao 已提交
400 401 402 403 404 405 406 407
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
  }
  return -1;
}

408 409 410 411 412 413 414 415 416 417
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
  // todo refactor
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
  int32_t          code = streamStateGetKVByCur(pCur, key, NULL, 0);
  streamStateDel(pState, &tmp);
  return code;
}

418 419 420 421 422 423 424 425 426 427
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
  //
  return tdbTbcMoveToFirst(pCur->pCur);
}

int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
  //
  return tdbTbcMoveToLast(pCur->pCur);
}

428 429 430 431 432 433
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
434
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
435
    streamStateFreeCur(pCur);
436 437 438 439
    return NULL;
  }

  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
440
  int32_t   c = 0;
441
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
5
54liuyao 已提交
442
    streamStateFreeCur(pCur);
443 444 445 446 447
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
448
    streamStateFreeCur(pCur);
449 450 451 452 453 454
    return NULL;
  }

  return pCur;
}

5
54liuyao 已提交
455
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
456
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
5
54liuyao 已提交
457
  if (!pCur) {
458 459
    return NULL;
  }
5
54liuyao 已提交
460
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
461
    streamStateFreeCur(pCur);
5
54liuyao 已提交
462 463
    return NULL;
  }
464

5
54liuyao 已提交
465
  int32_t c = 0;
466
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
467
    streamStateFreeCur(pCur);
468 469 470 471 472
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
473
    streamStateFreeCur(pCur);
474 475 476 477 478 479
    return NULL;
  }

  return pCur;
}

5
54liuyao 已提交
480
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
481 482 483 484
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
5
54liuyao 已提交
485
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
486
    streamStateFreeCur(pCur);
5
54liuyao 已提交
487 488
    return NULL;
  }
489

5
54liuyao 已提交
490
  int32_t c = 0;
491
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
492
    streamStateFreeCur(pCur);
493 494 495 496 497
    return NULL;
  }
  if (c < 0) return pCur;

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
5
54liuyao 已提交
498
    streamStateFreeCur(pCur);
499 500 501 502 503 504 505
    return NULL;
  }

  return pCur;
}

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
5
54liuyao 已提交
506 507 508
  if (!pCur) {
    return -1;
  }
509 510 511 512 513 514
  //
  return tdbTbcMoveToNext(pCur->pCur);
}

int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
  //
515 516 517
  if (!pCur) {
    return -1;
  }
518 519
  return tdbTbcMoveToPrev(pCur->pCur);
}
520
void streamStateFreeCur(SStreamStateCur* pCur) {
521 522 523
  if (!pCur) {
    return;
  }
524 525 526 527 528
  tdbTbcClose(pCur->pCur);
  taosMemoryFree(pCur);
}

void streamFreeVal(void* val) { tdbFree(val); }
5
54liuyao 已提交
529 530 531

int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
532 533
  return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
                     &pState->pTdbState->txn);
5
54liuyao 已提交
534 535 536
}

int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
537 538
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
  SSessionKey      resKey = *key;
5
54liuyao 已提交
539
  void*            tmp = NULL;
540 541
  int32_t          code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
  if (code == 0) {
5
54liuyao 已提交
542 543 544 545 546 547 548
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = tdbRealloc(NULL, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
    }
5
54liuyao 已提交
549 550
  }
  streamStateFreeCur(pCur);
551
  return code;
5
54liuyao 已提交
552 553 554 555
}

int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
556
  return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->pTdbState->txn);
5
54liuyao 已提交
557 558
}

559
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
5
54liuyao 已提交
560 561 562 563 564
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
565
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
566 567 568 569 570 571 572 573 574 575
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  int32_t          c = 0;
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
576
  if (c >= 0) return pCur;
5
54liuyao 已提交
577 578 579 580 581 582 583 584 585

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
}

586 587 588 589 590 591
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
592
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  int32_t          c = 0;
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  if (c <= 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
}

5
54liuyao 已提交
614 615 616 617 618 619
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
620
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
621 622 623 624 625 626 627 628 629 630
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  int32_t          c = 0;
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
631
  if (c < 0) return pCur;
5
54liuyao 已提交
632 633 634 635 636 637 638 639 640

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
}

641
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
5
54liuyao 已提交
642 643 644
  if (!pCur) {
    return -1;
  }
645 646 647
  SStateSessionKey* pKTmp = NULL;
  int32_t           kLen;
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
5
54liuyao 已提交
648 649 650 651 652 653 654 655 656 657 658 659 660
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
}

int32_t streamStateSessionClear(SStreamState* pState) {
661 662
  SSessionKey      key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
5
54liuyao 已提交
663 664 665 666
  while (1) {
    SSessionKey delKey = {0};
    void*       buf = NULL;
    int32_t     size = 0;
667
    int32_t     code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
5
54liuyao 已提交
668
    if (code == 0) {
669
      ASSERT(size > 0);
5
54liuyao 已提交
670 671 672 673 674 675 676 677 678 679 680
      memset(buf, 0, size);
      streamStateSessionPut(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return 0;
}

681 682 683 684 685 686
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
687
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
688 689
    streamStateFreeCur(pCur);
    return -1;
5
54liuyao 已提交
690 691
  }

692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  int32_t          c = 0;
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return -1;
  }

  SSessionKey resKey = *key;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
722 723
    }
  }
724

725
  streamStateFreeCur(pCur);
726
  return -1;
727 728
}

729 730
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                        int32_t* pVLen) {
5
54liuyao 已提交
731
  // todo refactor
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753
  int32_t     res = 0;
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;
  void*   tmp = tdbRealloc(NULL, valSize);
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
  int32_t          code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
    streamStateCurNext(pState, pCur);
  } else {
    *key = originKey;
5
54liuyao 已提交
754
    streamStateFreeCur(pCur);
755
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
756
  }
757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773

  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
5
54liuyao 已提交
774
  streamStateFreeCur(pCur);
775
  return res;
5
54liuyao 已提交
776 777 778 779 780
}

int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
  // todo refactor
781
  int32_t     res = 0;
5
54liuyao 已提交
782 783 784 785 786 787 788
  SSessionKey tmpKey = *key;
  int32_t     valSize = *pVLen;
  void*       tmp = tdbRealloc(NULL, valSize);
  if (!tmp) {
    return -1;
  }

789
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
790
  int32_t          code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
791 792 793
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
794
      streamStateSessionDel(pState, key);
795 796
      goto _end;
    }
5
54liuyao 已提交
797 798 799 800

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
801
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
802 803
      goto _end;
    }
5
54liuyao 已提交
804 805 806 807 808 809

    streamStateCurNext(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
810 811
  }

812
  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
813
  if (code == 0) {
5
54liuyao 已提交
814 815 816
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
817
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
818 819 820 821 822 823 824 825 826 827 828 829 830 831
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
}
5
54liuyao 已提交
832

833
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
5
54liuyao 已提交
834 835
  tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
              &pState->pTdbState->txn);
836 837 838 839 840
  return 0;
}

int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
  int32_t len;
5
54liuyao 已提交
841 842 843 844 845 846
  return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
}

void streamStateDestroy(SStreamState* pState) {
  taosMemoryFreeClear(pState->pTdbState);
  taosMemoryFreeClear(pState);
847 848
}

5
54liuyao 已提交
849 850 851 852 853 854 855
#if 0
char* streamStateSessionDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
856
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
857 858 859 860 861 862
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SSessionKey key = {0};
863 864 865
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
5
54liuyao 已提交
866
  if (code != 0) {
867
    streamStateFreeCur(pCur);
5
54liuyao 已提交
868 869 870 871 872 873 874 875 876 877 878 879 880 881
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
  len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SSessionKey){0};
    code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
882
      streamStateFreeCur(pCur);
5
54liuyao 已提交
883 884 885 886 887 888
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
    len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
889
  streamStateFreeCur(pCur);
5
54liuyao 已提交
890 891 892
  return dumpBuf;
}
#endif