streamState.c 31.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "streamState.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include <bits/stdint-uintn.h>
#include <string.h>
19
#include "executor.h"
dengyihao's avatar
dengyihao 已提交
20
#include "osMemory.h"
dengyihao's avatar
dengyihao 已提交
21
#include "rocksdb/c.h"
dengyihao's avatar
dengyihao 已提交
22
#include "streamBackendRocksdb.h"
23
#include "streamInc.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tcoding.h"
25
#include "tcommon.h"
26
#include "tcompare.h"
27 28
#include "ttimer.h"

dengyihao's avatar
dengyihao 已提交
29
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
5
54liuyao 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
45
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  if (pWin1->win.ekey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
67
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
5
54liuyao 已提交
68 69 70 71 72 73 74 75 76
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

77
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
5
54liuyao 已提交
78 79
}

dengyihao's avatar
dengyihao 已提交
80
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
  SStateKey* pWin1 = (SStateKey*)pKey1;
  SStateKey* pWin2 = (SStateKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

  if (pWin1->key.ts > pWin2->key.ts) {
    return 1;
  } else if (pWin1->key.ts < pWin2->key.ts) {
    return -1;
  }

  if (pWin1->key.groupId > pWin2->key.groupId) {
    return 1;
  } else if (pWin1->key.groupId < pWin2->key.groupId) {
    return -1;
  }

  return 0;
}

105
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
dengyihao's avatar
dengyihao 已提交
106
  qWarn("open stream state, %s", path);
107 108 109 110 111
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
  if (pState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
5
54liuyao 已提交
112 113 114 115 116 117
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
  if (pState->pTdbState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    streamStateDestroy(pState);
    return NULL;
  }
L
Liu Jicong 已提交
118

119
  char statePath[1024];
L
Liu Jicong 已提交
120 121 122
  if (!specPath) {
    sprintf(statePath, "%s/%d", path, pTask->taskId);
  } else {
123 124
    memset(statePath, 0, 1024);
    tstrncpy(statePath, path, 1024);
L
Liu Jicong 已提交
125
  }
dengyihao's avatar
dengyihao 已提交
126 127 128 129 130 131 132 133 134 135 136 137
#ifdef USE_ROCKSDB
  qWarn("open stream state1");
  int code = streamInitBackend(pState, statePath);
  if (code == -1) {
    taosMemoryFree(pState);
    pState = NULL;
  }
  qWarn("open stream state2, %s", statePath);
  pState->pTdbState->pOwner = pTask;
  return pState;

#else
L
add cfg  
Liu Jicong 已提交
138

L
Liu Jicong 已提交
139
  char cfgPath[1030];
L
add cfg  
Liu Jicong 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152
  sprintf(cfgPath, "%s/cfg", statePath);

  char cfg[1024];
  memset(cfg, 0, 1024);
  TdFilePtr pCfgFile = taosOpenFile(cfgPath, TD_FILE_READ);
  if (pCfgFile != NULL) {
    int64_t size;
    taosFStatFile(pCfgFile, &size, NULL);
    taosReadFile(pCfgFile, cfg, size);
    sscanf(cfg, "%d\n%d\n", &szPage, &pages);
  } else {
    taosMulModeMkDir(statePath, 0755);
    pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE);
L
Liu Jicong 已提交
153 154
    szPage = szPage < 0 ? 4096 : szPage;
    pages = pages < 0 ? 256 : pages;
L
add cfg  
Liu Jicong 已提交
155 156 157 158 159
    sprintf(cfg, "%d\n%d\n", szPage, pages);
    taosWriteFile(pCfgFile, cfg, strlen(cfg));
  }
  taosCloseFile(&pCfgFile);

160
  if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) {
161 162 163 164
    goto _err;
  }

  // open state storage backend
5
54liuyao 已提交
165 166
  if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
                0) < 0) {
167 168 169
    goto _err;
  }

5
54liuyao 已提交
170
  // todo refactor
5
54liuyao 已提交
171 172
  if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFillStateDb, 0) < 0) {
5
54liuyao 已提交
173 174 175
    goto _err;
  }

5
54liuyao 已提交
176 177
  if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pSessionStateDb, 0) < 0) {
5
54liuyao 已提交
178 179 180
    goto _err;
  }

5
54liuyao 已提交
181 182
  if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFuncStateDb, 0) < 0) {
183 184 185
    goto _err;
  }

5
54liuyao 已提交
186 187
  if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
                &pState->pTdbState->pParNameDb, 0) < 0) {
188 189 190
    goto _err;
  }

L
Liu Jicong 已提交
191 192 193 194 195
  if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) <
      0) {
    goto _err;
  }

196
  if (streamStateBegin(pState) < 0) {
197 198 199
    goto _err;
  }

5
54liuyao 已提交
200
  pState->pTdbState->pOwner = pTask;
201 202 203 204

  return pState;

_err:
5
54liuyao 已提交
205 206 207 208 209
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
210
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
211 212
  tdbClose(pState->pTdbState->db);
  streamStateDestroy(pState);
213
  return NULL;
dengyihao's avatar
dengyihao 已提交
214
#endif
215 216 217
}

void streamStateClose(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
218
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
219
  // streamCleanBackend(pState);
dengyihao's avatar
dengyihao 已提交
220
#else
221 222
  tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
  tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
5
54liuyao 已提交
223 224 225 226 227
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
228
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
229
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
230
#endif
5
54liuyao 已提交
231
  streamStateDestroy(pState);
232 233 234
}

int32_t streamStateBegin(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
235 236 237
#ifdef USE_ROCKSDB
  return 0;
#else
238
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
239
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
240
    tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
241 242 243
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
244
#endif
245 246 247
}

int32_t streamStateCommit(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
248 249 250
#ifdef USE_ROCKSDB
  return 0;
#else
251
  if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
M
Minglei Jin 已提交
252 253
    return -1;
  }
254
  if (tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
255 256
    return -1;
  }
257

258
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
259
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
260 261 262
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
263
#endif
264 265 266
}

int32_t streamStateAbort(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
267 268 269
#ifdef USE_ROCKSDB
  return 0;
#else
270
  if (tdbAbort(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
271 272
    return -1;
  }
273

274
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
275
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
276 277 278
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
279
#endif
280 281
}

282
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
283 284 285
#ifdef USE_ROCKSDB
  return streamStateFuncPut_rocksdb(pState, key, value, vLen);
#else
286
  return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
287
#endif
288 289
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
290 291 292
#ifdef USE_ROCKSDB
  return streamStateFuncGet(pState, key, pVal, pVLen);
#else
5
54liuyao 已提交
293
  return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
294
#endif
295 296 297
}

int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
dengyihao's avatar
dengyihao 已提交
298 299 300
#ifdef USE_ROCKSDB
  return streamStateFuncDel_rocksdb(pState, key);
#else
301
  return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
302
#endif
303 304
}

305
// todo refactor
306
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
307 308 309
#ifdef USE_ROCKSDB
  return streamStatePut_rocksdb(pState, key, value, vLen);
#else
310
  SStateKey sKey = {.key = *key, .opNum = pState->number};
311
  return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
312
#endif
313
}
5
54liuyao 已提交
314 315

// todo refactor
dengyihao's avatar
dengyihao 已提交
316
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
317
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
318
  return streamStateGet_rocksdb(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
319
#else
dengyihao's avatar
dengyihao 已提交
320 321
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
322
#endif
5
54liuyao 已提交
323
}
324
// todo refactor
dengyihao's avatar
dengyihao 已提交
325
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
326
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
327
  return streamStateDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
328
#else
329
  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
330 331 332 333 334 335 336 337 338 339
  return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
#endif
}

// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
  return streamStateFillPut_rocksdb(pState, key, value, vLen);
#else
  return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
340
#endif
341 342
}

5
54liuyao 已提交
343 344
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
345 346 347
#ifdef USE_ROCKSDB
  return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
#else
5
54liuyao 已提交
348
  return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
349
#endif
5
54liuyao 已提交
350 351
}

352
// todo refactor
dengyihao's avatar
dengyihao 已提交
353
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
354
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
355
  return streamStateFillDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
356
#else
dengyihao's avatar
dengyihao 已提交
357
  return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
358
#endif
359 360
}

361
int32_t streamStateClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
362 363 364
#ifdef USE_ROCKSDB
  return streamStateClear_rocksdb(pState);
#else
365 366 367 368
  SWinKey key = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &key, NULL, 0);
  while (1) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
dengyihao's avatar
dengyihao 已提交
369 370
    SWinKey delKey = {0};
    int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
5
54liuyao 已提交
371
    streamStateFreeCur(pCur);
372 373 374 375 376 377 378
    if (code == 0) {
      streamStateDel(pState, &delKey);
    } else {
      break;
    }
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
379
#endif
380 381 382 383
}

void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }

384
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
385 386 387
#ifdef USE_ROCKSDB
  return streamStateAddIfNotExist_rocksdb(pState, key, pVal, pVLen);
#else
388 389 390 391 392
  // todo refactor
  int32_t size = *pVLen;
  if (streamStateGet(pState, key, pVal, pVLen) == 0) {
    return 0;
  }
5
54liuyao 已提交
393 394 395
  *pVal = tdbRealloc(NULL, size);
  memset(*pVal, 0, size);
  return 0;
dengyihao's avatar
dengyihao 已提交
396
#endif
397 398 399 400
}

int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
  // todo refactor
5
54liuyao 已提交
401 402 403
  if (!pVal) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
404 405 406
#ifdef USE_ROCKSDB
  taosMemoryFree(pVal);
#else
407
  streamFreeVal(pVal);
dengyihao's avatar
dengyihao 已提交
408
#endif
409 410 411
  return 0;
}

412
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
413 414 415
#ifdef USE_ROCKSDB
  return streamStateGetCur_rocksdb(pState, key);
#else
416 417
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
418
  tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL);
419

dengyihao's avatar
dengyihao 已提交
420
  int32_t c = 0;
421
  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
422
  tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c);
423
  if (c != 0) {
5
54liuyao 已提交
424
    streamStateFreeCur(pCur);
425 426
    return NULL;
  }
427
  pCur->number = pState->number;
428
  return pCur;
dengyihao's avatar
dengyihao 已提交
429
#endif
430 431
}

5
54liuyao 已提交
432
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
433 434 435
#ifdef USE_ROCKSDB
  return streamStateFillGetCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
436 437
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
438
  tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
5
54liuyao 已提交
439

5
54liuyao 已提交
440
  int32_t c = 0;
5
54liuyao 已提交
441 442
  tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
  if (c != 0) {
5
54liuyao 已提交
443
    streamStateFreeCur(pCur);
5
54liuyao 已提交
444 445 446
    return NULL;
  }
  return pCur;
dengyihao's avatar
dengyihao 已提交
447
#endif
5
54liuyao 已提交
448 449 450
}

SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
451 452 453
#ifdef USE_ROCKSDB
  return streamStateGetAndCheckCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
454 455 456 457 458 459
  SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
    if (code == 0) {
      return pCur;
    }
5
54liuyao 已提交
460
    streamStateFreeCur(pCur);
5
54liuyao 已提交
461 462
  }
  return NULL;
dengyihao's avatar
dengyihao 已提交
463
#endif
5
54liuyao 已提交
464 465
}

466
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
467 468 469
#ifdef USE_ROCKSDB
  return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
470 471 472 473
  if (!pCur) {
    return -1;
  }
  const SStateKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
474
  int32_t kLen;
475 476 477 478 479 480 481 482
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
483
#endif
484 485 486
}

int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
487 488 489
#ifdef USE_ROCKSDB
  return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
490 491 492
  if (!pCur) {
    return -1;
  }
493
  const SWinKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
494
  int32_t kLen;
495 496 497 498 499
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  *pKey = *pKTmp;
  return 0;
dengyihao's avatar
dengyihao 已提交
500
#endif
501 502
}

5
54liuyao 已提交
503
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
504 505 506
#ifdef USE_ROCKSDB
  return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
507 508 509
  if (!pCur) {
    return -1;
  }
5
54liuyao 已提交
510
  uint64_t groupId = pKey->groupId;
dengyihao's avatar
dengyihao 已提交
511
  int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
5
54liuyao 已提交
512 513 514 515 516 517
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
  }
  return -1;
dengyihao's avatar
dengyihao 已提交
518
#endif
5
54liuyao 已提交
519 520
}

521
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
522 523 524
#ifdef USE_ROCKSDB
  return streamStateGetFirst_rocksdb(pState, key);
#else
525 526 527 528
  // todo refactor
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
dengyihao's avatar
dengyihao 已提交
529
  int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
M
Minglei Jin 已提交
530
  streamStateFreeCur(pCur);
531 532
  streamStateDel(pState, &tmp);
  return code;
dengyihao's avatar
dengyihao 已提交
533
#endif
534 535
}

536
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
537 538 539 540
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_first(pCur->iter);
  return 0;
#else
541
  return tdbTbcMoveToFirst(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
542
#endif
543 544 545
}

int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
546 547 548 549
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_last(pCur->iter);
  return 0;
#else
550
  return tdbTbcMoveToLast(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
551
#endif
552 553
}

554
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
555 556 557
#ifdef USE_ROCKSDB
  return streamStateSeekKeyNext_rocksdb(pState, key);
#else
558 559 560 561 562
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
563
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
564
    streamStateFreeCur(pCur);
565 566 567 568
    return NULL;
  }

  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
569
  int32_t c = 0;
570
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
5
54liuyao 已提交
571
    streamStateFreeCur(pCur);
572 573 574 575 576
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
577
    streamStateFreeCur(pCur);
578 579 580 581
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
582
#endif
583 584
}

5
54liuyao 已提交
585
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
586 587 588
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyNext_rocksdb(pState, key);
#else
589
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
5
54liuyao 已提交
590
  if (!pCur) {
591 592
    return NULL;
  }
5
54liuyao 已提交
593
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
594
    streamStateFreeCur(pCur);
5
54liuyao 已提交
595 596
    return NULL;
  }
597

5
54liuyao 已提交
598
  int32_t c = 0;
599
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
600
    streamStateFreeCur(pCur);
601 602 603 604 605
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
606
    streamStateFreeCur(pCur);
607 608 609 610
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
611
#endif
612 613
}

5
54liuyao 已提交
614
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
615 616 617
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyPrev_rocksdb(pState, key);
#else
618 619 620 621
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
5
54liuyao 已提交
622
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
623
    streamStateFreeCur(pCur);
5
54liuyao 已提交
624 625
    return NULL;
  }
626

5
54liuyao 已提交
627
  int32_t c = 0;
628
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
629
    streamStateFreeCur(pCur);
630 631 632 633 634
    return NULL;
  }
  if (c < 0) return pCur;

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
5
54liuyao 已提交
635
    streamStateFreeCur(pCur);
636 637 638 639
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
640
#endif
641 642 643
}

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
644 645 646
#ifdef USE_ROCKSDB
  return streamStateCurNext_rocksdb(pState, pCur);
#else
5
54liuyao 已提交
647 648 649
  if (!pCur) {
    return -1;
  }
650 651
  //
  return tdbTbcMoveToNext(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
652
#endif
653 654 655
}

int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
656 657 658
#ifdef USE_ROCKSDB
  return streamStateCurPrev_rocksdb(pState, pCur);
#else
659 660 661
  if (!pCur) {
    return -1;
  }
662
  return tdbTbcMoveToPrev(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
663
#endif
664
}
665
void streamStateFreeCur(SStreamStateCur* pCur) {
666 667 668
  if (!pCur) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
669
  rocksdb_iter_destroy(pCur->iter);
dengyihao's avatar
dengyihao 已提交
670
  tdbTbcClose(pCur->pCur);
671 672 673
  taosMemoryFree(pCur);
}

dengyihao's avatar
dengyihao 已提交
674 675 676 677 678 679 680
void streamFreeVal(void* val) {
#ifdef USE_ROCKSDB
  taosMemoryFree(val);
#else
  tdbFree(val);
#endif
}
5
54liuyao 已提交
681 682

int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
683 684 685
#ifdef USE_ROCKSDB
  return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else
5
54liuyao 已提交
686
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
687
  return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
688
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
689
#endif
5
54liuyao 已提交
690 691 692
}

int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
693 694 695 696
#ifdef USE_ROCKSDB
  return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
#else

697
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
dengyihao's avatar
dengyihao 已提交
698 699 700
  SSessionKey resKey = *key;
  void* tmp = NULL;
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
701
  if (code == 0) {
5
54liuyao 已提交
702 703 704 705 706 707 708
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = tdbRealloc(NULL, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
    }
5
54liuyao 已提交
709 710
  }
  streamStateFreeCur(pCur);
711
  return code;
dengyihao's avatar
dengyihao 已提交
712
#endif
5
54liuyao 已提交
713 714 715
}

int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
716 717 718
#ifdef USE_ROCKSDB
  return streamStateSessionDel_rocksdb(pState, key);
#else
5
54liuyao 已提交
719
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
720
  return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
721
#endif
5
54liuyao 已提交
722 723
}

724
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
725 726 727
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
#else
5
54liuyao 已提交
728 729 730 731 732
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
733
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
734 735 736 737 738
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
739
  int32_t c = 0;
5
54liuyao 已提交
740 741 742 743
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
744
  if (c >= 0) return pCur;
5
54liuyao 已提交
745 746 747 748 749 750 751

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
752
#endif
5
54liuyao 已提交
753 754
}

755
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
756 757 758
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key);
#else
759 760 761 762 763
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
764
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
765 766 767 768 769
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
770
  int32_t c = 0;
771 772 773 774 775 776 777 778 779 780 781 782 783
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  if (c <= 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
784
#endif
785 786
}

5
54liuyao 已提交
787
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
788 789 790
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyNext_rocksdb(pState, key);
#else
5
54liuyao 已提交
791 792 793 794 795
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
796
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
797 798 799 800 801
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
802
  int32_t c = 0;
5
54liuyao 已提交
803 804 805 806
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
807
  if (c < 0) return pCur;
5
54liuyao 已提交
808 809 810 811 812 813 814

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
815
#endif
5
54liuyao 已提交
816 817
}

818
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
819 820 821
#ifdef USE_ROCKSDB
  return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
822 823 824
  if (!pCur) {
    return -1;
  }
825
  SStateSessionKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
826
  int32_t kLen;
827
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
5
54liuyao 已提交
828 829 830 831 832 833 834 835 836 837
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
838
#endif
5
54liuyao 已提交
839 840 841
}

int32_t streamStateSessionClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
842 843 844 845
#ifdef USE_ROCKSDB
  return streamStateSessionClear_rocksdb(pState);
#else
  SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
846
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
5
54liuyao 已提交
847 848
  while (1) {
    SSessionKey delKey = {0};
dengyihao's avatar
dengyihao 已提交
849 850 851
    void* buf = NULL;
    int32_t size = 0;
    int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
5
54liuyao 已提交
852
    if (code == 0 && size > 0) {
5
54liuyao 已提交
853 854 855 856 857 858 859 860 861
      memset(buf, 0, size);
      streamStateSessionPut(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return 0;
dengyihao's avatar
dengyihao 已提交
862
#endif
5
54liuyao 已提交
863 864
}

865
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
dengyihao's avatar
dengyihao 已提交
866 867 868
#ifdef USE_ROCKSDB
  return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey);
#else
869 870 871 872 873
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
874
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
875 876
    streamStateFreeCur(pCur);
    return -1;
5
54liuyao 已提交
877 878
  }

879
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
880
  int32_t c = 0;
881 882 883 884 885 886
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return -1;
  }

  SSessionKey resKey = *key;
dengyihao's avatar
dengyihao 已提交
887
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
909 910
    }
  }
911

912
  streamStateFreeCur(pCur);
913
  return -1;
dengyihao's avatar
dengyihao 已提交
914
#endif
915 916
}

917 918
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                        int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
919 920 921
#ifdef USE_ROCKSDB
  return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen);
#else
5
54liuyao 已提交
922
  // todo refactor
dengyihao's avatar
dengyihao 已提交
923
  int32_t res = 0;
924 925 926 927 928
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;
dengyihao's avatar
dengyihao 已提交
929
  void* tmp = tdbRealloc(NULL, valSize);
930 931 932 933 934
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
935
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
936 937 938 939 940 941 942 943 944
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
    streamStateCurNext(pState, pCur);
  } else {
    *key = originKey;
5
54liuyao 已提交
945
    streamStateFreeCur(pCur);
946
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
947
  }
948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964

  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
5
54liuyao 已提交
965
  streamStateFreeCur(pCur);
966
  return res;
dengyihao's avatar
dengyihao 已提交
967 968

#endif
5
54liuyao 已提交
969 970 971 972 973
}

int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
974 975 976 977 978

#ifdef USE_ROCKSDB
  return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else
  int32_t res = 0;
5
54liuyao 已提交
979
  SSessionKey tmpKey = *key;
dengyihao's avatar
dengyihao 已提交
980 981
  int32_t valSize = *pVLen;
  void* tmp = tdbRealloc(NULL, valSize);
5
54liuyao 已提交
982 983 984 985
  if (!tmp) {
    return -1;
  }

986
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
987
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
988 989 990
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
991
      streamStateSessionDel(pState, key);
992 993
      goto _end;
    }
5
54liuyao 已提交
994 995 996 997

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
998
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
999 1000
      goto _end;
    }
5
54liuyao 已提交
1001 1002 1003 1004 1005 1006

    streamStateCurNext(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
1007 1008
  }

1009
  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1010
  if (code == 0) {
5
54liuyao 已提交
1011 1012 1013
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1014
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
dengyihao's avatar
dengyihao 已提交
1028
#endif
5
54liuyao 已提交
1029
}
5
54liuyao 已提交
1030

L
Liu Jicong 已提交
1031
int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
dengyihao's avatar
dengyihao 已提交
1032 1033 1034
#ifdef USE_ROCKSDB
  return streamStatePutParTag_rocksdb(pState, groupId, tag, tagLen);
#else
L
Liu Jicong 已提交
1035
  return tdbTbUpsert(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tag, tagLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
1036
#endif
L
Liu Jicong 已提交
1037 1038 1039
}

int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
dengyihao's avatar
dengyihao 已提交
1040 1041 1042
#ifdef USE_ROCKSDB
  return streamStateGetParTag_rocksdb(pState, groupId, tagVal, tagLen);
#else
L
Liu Jicong 已提交
1043
  return tdbTbGet(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tagVal, tagLen);
dengyihao's avatar
dengyihao 已提交
1044
#endif
L
Liu Jicong 已提交
1045 1046
}

1047
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
dengyihao's avatar
dengyihao 已提交
1048
  qWarn("try to write to cf parname");
dengyihao's avatar
dengyihao 已提交
1049 1050 1051
#ifdef USE_ROCKSDB
  return streamStatePutParName_rocksdb(pState, groupId, tbname);
#else
L
Liu Jicong 已提交
1052 1053
  return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
1054
#endif
1055 1056 1057
}

int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
dengyihao's avatar
dengyihao 已提交
1058 1059 1060
#ifdef USE_ROCKSDB
  return streamStateGetParName_rocksdb(pState, groupId, pVal);
#else
1061
  int32_t len;
5
54liuyao 已提交
1062
  return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
dengyihao's avatar
dengyihao 已提交
1063
#endif
5
54liuyao 已提交
1064 1065 1066
}

void streamStateDestroy(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
1067 1068 1069 1070
#ifdef USE_ROCKSDB
  streamStateDestroy_rocksdb(pState);
  // do nothong
#endif
5
54liuyao 已提交
1071 1072
  taosMemoryFreeClear(pState->pTdbState);
  taosMemoryFreeClear(pState);
1073 1074
}

5
54liuyao 已提交
1075 1076 1077 1078 1079 1080 1081
#if 0
char* streamStateSessionDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
1082
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
1083 1084 1085 1086 1087 1088
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SSessionKey key = {0};
1089 1090 1091
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
5
54liuyao 已提交
1092
  if (code != 0) {
1093
    streamStateFreeCur(pCur);
5
54liuyao 已提交
1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
  len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SSessionKey){0};
    code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
1108
      streamStateFreeCur(pCur);
5
54liuyao 已提交
1109 1110 1111 1112 1113 1114
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
    len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
1115
  streamStateFreeCur(pCur);
5
54liuyao 已提交
1116 1117
  return dumpBuf;
}
5
54liuyao 已提交
1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160

char* streamStateIntervalDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SWinKey key = {0};
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateGetKVByCur(pCur, &key, (const void **)&buf, &bufSize);
  if (code != 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
  // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SWinKey){0};
    code = streamStateGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
      streamStateFreeCur(pCur);
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
    // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
  streamStateFreeCur(pCur);
  return dumpBuf;
}
5
54liuyao 已提交
1161
#endif