streamState.c 31.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "streamState.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include <bits/stdint-uintn.h>
#include <string.h>
19
#include "executor.h"
dengyihao's avatar
dengyihao 已提交
20
#include "osMemory.h"
dengyihao's avatar
dengyihao 已提交
21
#include "rocksdb/c.h"
dengyihao's avatar
dengyihao 已提交
22
#include "streamBackendRocksdb.h"
23
#include "streamInc.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tcoding.h"
25
#include "tcommon.h"
26
#include "tcompare.h"
27 28
#include "ttimer.h"

dengyihao's avatar
dengyihao 已提交
29
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
5
54liuyao 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
45
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  if (pWin1->win.ekey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
67
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
5
54liuyao 已提交
68 69 70 71 72 73 74 75 76
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

77
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
5
54liuyao 已提交
78 79
}

dengyihao's avatar
dengyihao 已提交
80
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
  SStateKey* pWin1 = (SStateKey*)pKey1;
  SStateKey* pWin2 = (SStateKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

  if (pWin1->key.ts > pWin2->key.ts) {
    return 1;
  } else if (pWin1->key.ts < pWin2->key.ts) {
    return -1;
  }

  if (pWin1->key.groupId > pWin2->key.groupId) {
    return 1;
  } else if (pWin1->key.groupId < pWin2->key.groupId) {
    return -1;
  }

  return 0;
}

105
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
dengyihao's avatar
dengyihao 已提交
106
  qWarn("open stream state, %s", path);
107 108 109 110 111
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
  if (pState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
5
54liuyao 已提交
112 113 114 115 116 117
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
  if (pState->pTdbState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    streamStateDestroy(pState);
    return NULL;
  }
L
Liu Jicong 已提交
118

119
  char statePath[1024];
L
Liu Jicong 已提交
120 121 122
  if (!specPath) {
    sprintf(statePath, "%s/%d", path, pTask->taskId);
  } else {
123 124
    memset(statePath, 0, 1024);
    tstrncpy(statePath, path, 1024);
L
Liu Jicong 已提交
125
  }
dengyihao's avatar
dengyihao 已提交
126 127 128 129 130 131 132 133 134 135 136 137
#ifdef USE_ROCKSDB
  qWarn("open stream state1");
  int code = streamInitBackend(pState, statePath);
  if (code == -1) {
    taosMemoryFree(pState);
    pState = NULL;
  }
  qWarn("open stream state2, %s", statePath);
  pState->pTdbState->pOwner = pTask;
  return pState;

#else
L
add cfg  
Liu Jicong 已提交
138

L
Liu Jicong 已提交
139
  char cfgPath[1030];
L
add cfg  
Liu Jicong 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152
  sprintf(cfgPath, "%s/cfg", statePath);

  char cfg[1024];
  memset(cfg, 0, 1024);
  TdFilePtr pCfgFile = taosOpenFile(cfgPath, TD_FILE_READ);
  if (pCfgFile != NULL) {
    int64_t size;
    taosFStatFile(pCfgFile, &size, NULL);
    taosReadFile(pCfgFile, cfg, size);
    sscanf(cfg, "%d\n%d\n", &szPage, &pages);
  } else {
    taosMulModeMkDir(statePath, 0755);
    pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE);
L
Liu Jicong 已提交
153 154
    szPage = szPage < 0 ? 4096 : szPage;
    pages = pages < 0 ? 256 : pages;
L
add cfg  
Liu Jicong 已提交
155 156 157 158 159
    sprintf(cfg, "%d\n%d\n", szPage, pages);
    taosWriteFile(pCfgFile, cfg, strlen(cfg));
  }
  taosCloseFile(&pCfgFile);

160
  if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) {
161 162 163 164
    goto _err;
  }

  // open state storage backend
5
54liuyao 已提交
165 166
  if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
                0) < 0) {
167 168 169
    goto _err;
  }

5
54liuyao 已提交
170
  // todo refactor
5
54liuyao 已提交
171 172
  if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFillStateDb, 0) < 0) {
5
54liuyao 已提交
173 174 175
    goto _err;
  }

5
54liuyao 已提交
176 177
  if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pSessionStateDb, 0) < 0) {
5
54liuyao 已提交
178 179 180
    goto _err;
  }

5
54liuyao 已提交
181 182
  if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFuncStateDb, 0) < 0) {
183 184 185
    goto _err;
  }

5
54liuyao 已提交
186 187
  if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
                &pState->pTdbState->pParNameDb, 0) < 0) {
188 189 190
    goto _err;
  }

L
Liu Jicong 已提交
191 192 193 194 195
  if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) <
      0) {
    goto _err;
  }

196
  if (streamStateBegin(pState) < 0) {
197 198 199
    goto _err;
  }

5
54liuyao 已提交
200
  pState->pTdbState->pOwner = pTask;
201 202 203 204

  return pState;

_err:
5
54liuyao 已提交
205 206 207 208 209
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
210
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
211 212
  tdbClose(pState->pTdbState->db);
  streamStateDestroy(pState);
213
  return NULL;
dengyihao's avatar
dengyihao 已提交
214
#endif
215 216 217
}

void streamStateClose(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
218
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
219
  // streamCleanBackend(pState);
dengyihao's avatar
dengyihao 已提交
220
#else
221 222
  tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
  tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
5
54liuyao 已提交
223 224 225 226 227
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
228
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
229
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
230
#endif
5
54liuyao 已提交
231
  streamStateDestroy(pState);
232 233 234
}

int32_t streamStateBegin(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
235 236 237
#ifdef USE_ROCKSDB
  return 0;
#else
238
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
239
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
240
    tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
241 242 243
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
244
#endif
245 246 247
}

int32_t streamStateCommit(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
248 249 250
#ifdef USE_ROCKSDB
  return 0;
#else
251
  if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
M
Minglei Jin 已提交
252 253
    return -1;
  }
254
  if (tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
255 256
    return -1;
  }
257

258
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
259
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
260 261 262
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
263
#endif
264 265 266
}

int32_t streamStateAbort(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
267 268 269
#ifdef USE_ROCKSDB
  return 0;
#else
270
  if (tdbAbort(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
271 272
    return -1;
  }
273

274
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
275
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
276 277 278
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
279
#endif
280 281
}

282
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
283 284 285
#ifdef USE_ROCKSDB
  return streamStateFuncPut_rocksdb(pState, key, value, vLen);
#else
286
  return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
287
#endif
288 289
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
290 291 292
#ifdef USE_ROCKSDB
  return streamStateFuncGet(pState, key, pVal, pVLen);
#else
5
54liuyao 已提交
293
  return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
294
#endif
295 296 297
}

int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
dengyihao's avatar
dengyihao 已提交
298 299 300
#ifdef USE_ROCKSDB
  return streamStateFuncDel_rocksdb(pState, key);
#else
301
  return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
302
#endif
303 304
}

305
// todo refactor
306
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
307 308 309
#ifdef USE_ROCKSDB
  return streamStatePut_rocksdb(pState, key, value, vLen);
#else
310
  SStateKey sKey = {.key = *key, .opNum = pState->number};
311
  return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
312
#endif
313
}
5
54liuyao 已提交
314 315 316

// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
317 318 319
#ifdef USE_ROCKSDB
  return streamStateFillPut_rocksdb(pState, key, value, vLen);
#else
320
  return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
321
#endif
5
54liuyao 已提交
322 323
}

324
// todo refactor
325
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
326 327 328
#ifdef USE_ROCKSDB
  return streamStateGet_rocksdb(pState, key, pVal, pVLen);
#else
329
  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
330
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
331
#endif
332 333
}

5
54liuyao 已提交
334 335
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
336 337 338
#ifdef USE_ROCKSDB
  return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
#else
5
54liuyao 已提交
339
  return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
340
#endif
5
54liuyao 已提交
341 342
}

343
// todo refactor
344
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
345 346 347
#ifdef USE_ROCKSDB
  return streamStateDel_rocksdb(pState, key);
#else
348
  SStateKey sKey = {.key = *key, .opNum = pState->number};
349
  return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
350
#endif
351 352
}

353
int32_t streamStateClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
354 355 356
#ifdef USE_ROCKSDB
  return streamStateClear_rocksdb(pState);
#else
357 358 359 360
  SWinKey key = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &key, NULL, 0);
  while (1) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
dengyihao's avatar
dengyihao 已提交
361 362
    SWinKey delKey = {0};
    int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
5
54liuyao 已提交
363
    streamStateFreeCur(pCur);
364 365 366 367 368 369 370
    if (code == 0) {
      streamStateDel(pState, &delKey);
    } else {
      break;
    }
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
371
#endif
372 373 374 375
}

void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }

5
54liuyao 已提交
376 377
// todo refactor
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
378 379 380
#ifdef USE_ROCKSDB
  return streamStateFillDel_rocksdb(pState, key);
#else
381
  return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
382
#endif
5
54liuyao 已提交
383 384
}

385
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
386 387 388
#ifdef USE_ROCKSDB
  return streamStateAddIfNotExist_rocksdb(pState, key, pVal, pVLen);
#else
389 390 391 392 393
  // todo refactor
  int32_t size = *pVLen;
  if (streamStateGet(pState, key, pVal, pVLen) == 0) {
    return 0;
  }
5
54liuyao 已提交
394 395 396
  *pVal = tdbRealloc(NULL, size);
  memset(*pVal, 0, size);
  return 0;
dengyihao's avatar
dengyihao 已提交
397
#endif
398 399 400 401
}

int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
  // todo refactor
5
54liuyao 已提交
402 403 404
  if (!pVal) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
405 406 407
#ifdef USE_ROCKSDB
  taosMemoryFree(pVal);
#else
408
  streamFreeVal(pVal);
dengyihao's avatar
dengyihao 已提交
409
#endif
410 411 412
  return 0;
}

413
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
414 415 416
#ifdef USE_ROCKSDB
  return streamStateGetCur_rocksdb(pState, key);
#else
417 418
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
419
  tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL);
420

dengyihao's avatar
dengyihao 已提交
421
  int32_t c = 0;
422
  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
423
  tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c);
424
  if (c != 0) {
5
54liuyao 已提交
425
    streamStateFreeCur(pCur);
426 427
    return NULL;
  }
428
  pCur->number = pState->number;
429
  return pCur;
dengyihao's avatar
dengyihao 已提交
430
#endif
431 432
}

5
54liuyao 已提交
433
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
434 435 436
#ifdef USE_ROCKSDB
  return streamStateFillGetCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
437 438
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
439
  tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
5
54liuyao 已提交
440

5
54liuyao 已提交
441
  int32_t c = 0;
5
54liuyao 已提交
442 443
  tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
  if (c != 0) {
5
54liuyao 已提交
444
    streamStateFreeCur(pCur);
5
54liuyao 已提交
445 446 447
    return NULL;
  }
  return pCur;
dengyihao's avatar
dengyihao 已提交
448
#endif
5
54liuyao 已提交
449 450 451
}

SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
452 453 454
#ifdef USE_ROCKSDB
  return streamStateGetAndCheckCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
455 456 457 458 459 460
  SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
    if (code == 0) {
      return pCur;
    }
5
54liuyao 已提交
461
    streamStateFreeCur(pCur);
5
54liuyao 已提交
462 463
  }
  return NULL;
dengyihao's avatar
dengyihao 已提交
464
#endif
5
54liuyao 已提交
465 466
}

467
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
468 469 470
#ifdef USE_ROCKSDB
  return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
471 472 473 474
  if (!pCur) {
    return -1;
  }
  const SStateKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
475
  int32_t kLen;
476 477 478 479 480 481 482 483
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
484
#endif
485 486 487
}

int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
488 489 490
#ifdef USE_ROCKSDB
  return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
491 492 493
  if (!pCur) {
    return -1;
  }
494
  const SWinKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
495
  int32_t kLen;
496 497 498 499 500
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  *pKey = *pKTmp;
  return 0;
dengyihao's avatar
dengyihao 已提交
501
#endif
502 503
}

5
54liuyao 已提交
504
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
505 506 507
#ifdef USE_ROCKSDB
  return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
508 509 510
  if (!pCur) {
    return -1;
  }
5
54liuyao 已提交
511
  uint64_t groupId = pKey->groupId;
dengyihao's avatar
dengyihao 已提交
512
  int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
5
54liuyao 已提交
513 514 515 516 517 518
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
  }
  return -1;
dengyihao's avatar
dengyihao 已提交
519
#endif
5
54liuyao 已提交
520 521
}

522
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
523 524 525
#ifdef USE_ROCKSDB
  return streamStateGetFirst_rocksdb(pState, key);
#else
526 527 528 529
  // todo refactor
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
dengyihao's avatar
dengyihao 已提交
530
  int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
M
Minglei Jin 已提交
531
  streamStateFreeCur(pCur);
532 533
  streamStateDel(pState, &tmp);
  return code;
dengyihao's avatar
dengyihao 已提交
534
#endif
535 536
}

537 538 539 540 541 542 543 544 545 546
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
  //
  return tdbTbcMoveToFirst(pCur->pCur);
}

int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
  //
  return tdbTbcMoveToLast(pCur->pCur);
}

547
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
548 549 550
#ifdef USE_ROCKSDB
  return streamStateSeekKeyNext_rocksdb(pState, key);
#else
551 552 553 554 555
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
556
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
557
    streamStateFreeCur(pCur);
558 559 560 561
    return NULL;
  }

  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
562
  int32_t c = 0;
563
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
5
54liuyao 已提交
564
    streamStateFreeCur(pCur);
565 566 567 568 569
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
570
    streamStateFreeCur(pCur);
571 572 573 574
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
575
#endif
576 577
}

5
54liuyao 已提交
578
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
579 580 581
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyNext_rocksdb(pState, key);
#else
582
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
5
54liuyao 已提交
583
  if (!pCur) {
584 585
    return NULL;
  }
5
54liuyao 已提交
586
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
587
    streamStateFreeCur(pCur);
5
54liuyao 已提交
588 589
    return NULL;
  }
590

5
54liuyao 已提交
591
  int32_t c = 0;
592
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
593
    streamStateFreeCur(pCur);
594 595 596 597 598
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
599
    streamStateFreeCur(pCur);
600 601 602 603
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
604
#endif
605 606
}

5
54liuyao 已提交
607
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
608 609 610
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyPrev_rocksdb(pState, key);
#else
611 612 613 614
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
5
54liuyao 已提交
615
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
616
    streamStateFreeCur(pCur);
5
54liuyao 已提交
617 618
    return NULL;
  }
619

5
54liuyao 已提交
620
  int32_t c = 0;
621
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
622
    streamStateFreeCur(pCur);
623 624 625 626 627
    return NULL;
  }
  if (c < 0) return pCur;

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
5
54liuyao 已提交
628
    streamStateFreeCur(pCur);
629 630 631 632
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
633
#endif
634 635 636
}

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
637 638 639
#ifdef USE_ROCKSDB
  return streamStateCurNext_rocksdb(pState, pCur);
#else
5
54liuyao 已提交
640 641 642
  if (!pCur) {
    return -1;
  }
643 644
  //
  return tdbTbcMoveToNext(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
645
#endif
646 647 648
}

int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
649 650 651
#ifdef USE_ROCKSDB
  return streamStateCurPrev_rocksdb(pState, pCur);
#else
652 653 654
  if (!pCur) {
    return -1;
  }
655
  return tdbTbcMoveToPrev(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
656
#endif
657
}
658
void streamStateFreeCur(SStreamStateCur* pCur) {
659 660 661
  if (!pCur) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
662
  rocksdb_iter_destroy(pCur->iter);
dengyihao's avatar
dengyihao 已提交
663
  tdbTbcClose(pCur->pCur);
664 665 666
  taosMemoryFree(pCur);
}

dengyihao's avatar
dengyihao 已提交
667 668 669 670 671 672 673
void streamFreeVal(void* val) {
#ifdef USE_ROCKSDB
  taosMemoryFree(val);
#else
  tdbFree(val);
#endif
}
5
54liuyao 已提交
674 675

int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
676 677 678
#ifdef USE_ROCKSDB
  return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else
5
54liuyao 已提交
679
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
680
  return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
681
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
682
#endif
5
54liuyao 已提交
683 684 685
}

int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
686 687 688 689
#ifdef USE_ROCKSDB
  return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
#else

690
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
dengyihao's avatar
dengyihao 已提交
691 692 693
  SSessionKey resKey = *key;
  void* tmp = NULL;
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
694
  if (code == 0) {
5
54liuyao 已提交
695 696 697 698 699 700 701
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = tdbRealloc(NULL, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
    }
5
54liuyao 已提交
702 703
  }
  streamStateFreeCur(pCur);
704
  return code;
dengyihao's avatar
dengyihao 已提交
705
#endif
5
54liuyao 已提交
706 707 708
}

int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
709 710 711
#ifdef USE_ROCKSDB
  return streamStateSessionDel_rocksdb(pState, key);
#else
5
54liuyao 已提交
712
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
713
  return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
714
#endif
5
54liuyao 已提交
715 716
}

717
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
718 719 720
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
#else
5
54liuyao 已提交
721 722 723 724 725
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
726
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
727 728 729 730 731
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
732
  int32_t c = 0;
5
54liuyao 已提交
733 734 735 736
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
737
  if (c >= 0) return pCur;
5
54liuyao 已提交
738 739 740 741 742 743 744

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
745
#endif
5
54liuyao 已提交
746 747
}

748
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
749 750 751
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key);
#else
752 753 754 755 756
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
757
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
758 759 760 761 762
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
763
  int32_t c = 0;
764 765 766 767 768 769 770 771 772 773 774 775 776
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  if (c <= 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
777
#endif
778 779
}

5
54liuyao 已提交
780
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
781 782 783
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyNext_rocksdb(pState, key);
#else
5
54liuyao 已提交
784 785 786 787 788
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
789
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
790 791 792 793 794
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
795
  int32_t c = 0;
5
54liuyao 已提交
796 797 798 799
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
800
  if (c < 0) return pCur;
5
54liuyao 已提交
801 802 803 804 805 806 807

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
808
#endif
5
54liuyao 已提交
809 810
}

811
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
812 813 814
#ifdef USE_ROCKSDB
  return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
815 816 817
  if (!pCur) {
    return -1;
  }
818
  SStateSessionKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
819
  int32_t kLen;
820
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
5
54liuyao 已提交
821 822 823 824 825 826 827 828 829 830
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
831
#endif
5
54liuyao 已提交
832 833 834
}

int32_t streamStateSessionClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
835 836 837 838
#ifdef USE_ROCKSDB
  return streamStateSessionClear_rocksdb(pState);
#else
  SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
839
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
5
54liuyao 已提交
840 841
  while (1) {
    SSessionKey delKey = {0};
dengyihao's avatar
dengyihao 已提交
842 843 844
    void* buf = NULL;
    int32_t size = 0;
    int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
5
54liuyao 已提交
845
    if (code == 0 && size > 0) {
5
54liuyao 已提交
846 847 848 849 850 851 852 853 854
      memset(buf, 0, size);
      streamStateSessionPut(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return 0;
dengyihao's avatar
dengyihao 已提交
855
#endif
5
54liuyao 已提交
856 857
}

858
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
dengyihao's avatar
dengyihao 已提交
859 860 861
#ifdef USE_ROCKSDB
  return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey);
#else
862 863 864 865 866
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
867
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
868 869
    streamStateFreeCur(pCur);
    return -1;
5
54liuyao 已提交
870 871
  }

872
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
873
  int32_t c = 0;
874 875 876 877 878 879
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return -1;
  }

  SSessionKey resKey = *key;
dengyihao's avatar
dengyihao 已提交
880
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
902 903
    }
  }
904

905
  streamStateFreeCur(pCur);
906
  return -1;
dengyihao's avatar
dengyihao 已提交
907
#endif
908 909
}

910 911
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                        int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
912 913 914
#ifdef USE_ROCKSDB
  return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen);
#else
5
54liuyao 已提交
915
  // todo refactor
dengyihao's avatar
dengyihao 已提交
916
  int32_t res = 0;
917 918 919 920 921
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;
dengyihao's avatar
dengyihao 已提交
922
  void* tmp = tdbRealloc(NULL, valSize);
923 924 925 926 927
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
928
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
929 930 931 932 933 934 935 936 937
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
    streamStateCurNext(pState, pCur);
  } else {
    *key = originKey;
5
54liuyao 已提交
938
    streamStateFreeCur(pCur);
939
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
940
  }
941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957

  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
5
54liuyao 已提交
958
  streamStateFreeCur(pCur);
959
  return res;
dengyihao's avatar
dengyihao 已提交
960 961

#endif
5
54liuyao 已提交
962 963 964 965 966
}

int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
967 968 969 970 971

#ifdef USE_ROCKSDB
  return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else
  int32_t res = 0;
5
54liuyao 已提交
972
  SSessionKey tmpKey = *key;
dengyihao's avatar
dengyihao 已提交
973 974
  int32_t valSize = *pVLen;
  void* tmp = tdbRealloc(NULL, valSize);
5
54liuyao 已提交
975 976 977 978
  if (!tmp) {
    return -1;
  }

979
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
980
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
981 982 983
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
984
      streamStateSessionDel(pState, key);
985 986
      goto _end;
    }
5
54liuyao 已提交
987 988 989 990

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
991
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
992 993
      goto _end;
    }
5
54liuyao 已提交
994 995 996 997 998 999

    streamStateCurNext(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
1000 1001
  }

1002
  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1003
  if (code == 0) {
5
54liuyao 已提交
1004 1005 1006
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1007
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
dengyihao's avatar
dengyihao 已提交
1021
#endif
5
54liuyao 已提交
1022
}
5
54liuyao 已提交
1023

L
Liu Jicong 已提交
1024
int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
dengyihao's avatar
dengyihao 已提交
1025 1026 1027
#ifdef USE_ROCKSDB
  return streamStatePutParTag_rocksdb(pState, groupId, tag, tagLen);
#else
L
Liu Jicong 已提交
1028
  return tdbTbUpsert(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tag, tagLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
1029
#endif
L
Liu Jicong 已提交
1030 1031 1032
}

int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
dengyihao's avatar
dengyihao 已提交
1033 1034 1035
#ifdef USE_ROCKSDB
  return streamStateGetParTag_rocksdb(pState, groupId, tagVal, tagLen);
#else
L
Liu Jicong 已提交
1036
  return tdbTbGet(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tagVal, tagLen);
dengyihao's avatar
dengyihao 已提交
1037
#endif
L
Liu Jicong 已提交
1038 1039
}

1040
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
dengyihao's avatar
dengyihao 已提交
1041
  qWarn("try to write to cf parname");
dengyihao's avatar
dengyihao 已提交
1042 1043 1044
#ifdef USE_ROCKSDB
  return streamStatePutParName_rocksdb(pState, groupId, tbname);
#else
L
Liu Jicong 已提交
1045 1046
  return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
1047
#endif
1048 1049 1050
}

int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
dengyihao's avatar
dengyihao 已提交
1051 1052 1053
#ifdef USE_ROCKSDB
  return streamStateGetParName_rocksdb(pState, groupId, pVal);
#else
1054
  int32_t len;
5
54liuyao 已提交
1055
  return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
dengyihao's avatar
dengyihao 已提交
1056
#endif
5
54liuyao 已提交
1057 1058 1059
}

void streamStateDestroy(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
1060 1061 1062 1063
#ifdef USE_ROCKSDB
  streamStateDestroy_rocksdb(pState);
  // do nothong
#endif
5
54liuyao 已提交
1064 1065
  taosMemoryFreeClear(pState->pTdbState);
  taosMemoryFreeClear(pState);
1066 1067
}

5
54liuyao 已提交
1068 1069 1070 1071 1072 1073 1074
#if 0
char* streamStateSessionDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
1075
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
1076 1077 1078 1079 1080 1081
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SSessionKey key = {0};
1082 1083 1084
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
5
54liuyao 已提交
1085
  if (code != 0) {
1086
    streamStateFreeCur(pCur);
5
54liuyao 已提交
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
  len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SSessionKey){0};
    code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
1101
      streamStateFreeCur(pCur);
5
54liuyao 已提交
1102 1103 1104 1105 1106 1107
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
    len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
1108
  streamStateFreeCur(pCur);
5
54liuyao 已提交
1109 1110
  return dumpBuf;
}
5
54liuyao 已提交
1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153

char* streamStateIntervalDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SWinKey key = {0};
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateGetKVByCur(pCur, &key, (const void **)&buf, &bufSize);
  if (code != 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
  // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SWinKey){0};
    code = streamStateGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
      streamStateFreeCur(pCur);
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
    // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
  streamStateFreeCur(pCur);
  return dumpBuf;
}
5
54liuyao 已提交
1154
#endif