streamState.c 31.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "streamState.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include <bits/stdint-uintn.h>
#include <string.h>
19
#include "executor.h"
dengyihao's avatar
dengyihao 已提交
20
#include "osMemory.h"
dengyihao's avatar
dengyihao 已提交
21
#include "rocksdb/c.h"
dengyihao's avatar
dengyihao 已提交
22
#include "streamBackendRocksdb.h"
23
#include "streamInc.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tcoding.h"
25
#include "tcommon.h"
26
#include "tcompare.h"
27 28
#include "ttimer.h"

dengyihao's avatar
dengyihao 已提交
29
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
5
54liuyao 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
45
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  if (pWin1->win.ekey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
67
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
5
54liuyao 已提交
68 69 70 71 72 73 74 75 76
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

77
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
5
54liuyao 已提交
78 79
}

dengyihao's avatar
dengyihao 已提交
80
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
  SStateKey* pWin1 = (SStateKey*)pKey1;
  SStateKey* pWin2 = (SStateKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

  if (pWin1->key.ts > pWin2->key.ts) {
    return 1;
  } else if (pWin1->key.ts < pWin2->key.ts) {
    return -1;
  }

  if (pWin1->key.groupId > pWin2->key.groupId) {
    return 1;
  } else if (pWin1->key.groupId < pWin2->key.groupId) {
    return -1;
  }

  return 0;
}

105
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
dengyihao's avatar
dengyihao 已提交
106
  qWarn("open stream state, %s", path);
107 108 109 110 111
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
  if (pState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
5
54liuyao 已提交
112 113 114 115 116 117
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
  if (pState->pTdbState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    streamStateDestroy(pState);
    return NULL;
  }
L
Liu Jicong 已提交
118

119
  char statePath[1024];
L
Liu Jicong 已提交
120 121 122
  if (!specPath) {
    sprintf(statePath, "%s/%d", path, pTask->taskId);
  } else {
123 124
    memset(statePath, 0, 1024);
    tstrncpy(statePath, path, 1024);
L
Liu Jicong 已提交
125
  }
dengyihao's avatar
dengyihao 已提交
126 127 128 129 130 131 132 133 134 135 136 137
#ifdef USE_ROCKSDB
  qWarn("open stream state1");
  int code = streamInitBackend(pState, statePath);
  if (code == -1) {
    taosMemoryFree(pState);
    pState = NULL;
  }
  qWarn("open stream state2, %s", statePath);
  pState->pTdbState->pOwner = pTask;
  return pState;

#else
L
add cfg  
Liu Jicong 已提交
138

L
Liu Jicong 已提交
139
  char cfgPath[1030];
L
add cfg  
Liu Jicong 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152
  sprintf(cfgPath, "%s/cfg", statePath);

  char cfg[1024];
  memset(cfg, 0, 1024);
  TdFilePtr pCfgFile = taosOpenFile(cfgPath, TD_FILE_READ);
  if (pCfgFile != NULL) {
    int64_t size;
    taosFStatFile(pCfgFile, &size, NULL);
    taosReadFile(pCfgFile, cfg, size);
    sscanf(cfg, "%d\n%d\n", &szPage, &pages);
  } else {
    taosMulModeMkDir(statePath, 0755);
    pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE);
L
Liu Jicong 已提交
153 154
    szPage = szPage < 0 ? 4096 : szPage;
    pages = pages < 0 ? 256 : pages;
L
add cfg  
Liu Jicong 已提交
155 156 157 158 159
    sprintf(cfg, "%d\n%d\n", szPage, pages);
    taosWriteFile(pCfgFile, cfg, strlen(cfg));
  }
  taosCloseFile(&pCfgFile);

160
  if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) {
161 162 163 164
    goto _err;
  }

  // open state storage backend
5
54liuyao 已提交
165 166
  if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
                0) < 0) {
167 168 169
    goto _err;
  }

5
54liuyao 已提交
170
  // todo refactor
5
54liuyao 已提交
171 172
  if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFillStateDb, 0) < 0) {
5
54liuyao 已提交
173 174 175
    goto _err;
  }

5
54liuyao 已提交
176 177
  if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pSessionStateDb, 0) < 0) {
5
54liuyao 已提交
178 179 180
    goto _err;
  }

5
54liuyao 已提交
181 182
  if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFuncStateDb, 0) < 0) {
183 184 185
    goto _err;
  }

5
54liuyao 已提交
186 187
  if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
                &pState->pTdbState->pParNameDb, 0) < 0) {
188 189 190
    goto _err;
  }

L
Liu Jicong 已提交
191 192 193 194 195
  if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) <
      0) {
    goto _err;
  }

196
  if (streamStateBegin(pState) < 0) {
197 198 199
    goto _err;
  }

5
54liuyao 已提交
200
  pState->pTdbState->pOwner = pTask;
201 202 203 204

  return pState;

_err:
5
54liuyao 已提交
205 206 207 208 209
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
210
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
211 212
  tdbClose(pState->pTdbState->db);
  streamStateDestroy(pState);
213
  return NULL;
dengyihao's avatar
dengyihao 已提交
214
#endif
215 216 217
}

void streamStateClose(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
218
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
219
  streamCleanBackend(pState);
dengyihao's avatar
dengyihao 已提交
220
#else
221 222
  tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
  tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
5
54liuyao 已提交
223 224 225 226 227
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
228
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
229
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
230
#endif
5
54liuyao 已提交
231
  streamStateDestroy(pState);
232 233 234
}

int32_t streamStateBegin(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
235 236 237
#ifdef USE_ROCKSDB
  return 0;
#else
238
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
239
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
240
    tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
241 242 243
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
244
#endif
245 246 247
}

int32_t streamStateCommit(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
248 249 250
#ifdef USE_ROCKSDB
  return 0;
#else
251
  if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
M
Minglei Jin 已提交
252 253
    return -1;
  }
254
  if (tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
255 256
    return -1;
  }
257

258
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
259
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
260 261 262
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
263
#endif
264 265 266
}

int32_t streamStateAbort(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
267 268 269
#ifdef USE_ROCKSDB
  return 0;
#else
270
  if (tdbAbort(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
271 272
    return -1;
  }
273

274
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
275
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
276 277 278
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
279
#endif
280 281
}

282
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
283 284 285
#ifdef USE_ROCKSDB
  return streamStateFuncPut_rocksdb(pState, key, value, vLen);
#else
286
  return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
287
#endif
288 289
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
290
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
291
  return streamStateFuncGet_rocksdb(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
292
#else
5
54liuyao 已提交
293
  return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
294
#endif
295 296 297
}

int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
dengyihao's avatar
dengyihao 已提交
298 299 300
#ifdef USE_ROCKSDB
  return streamStateFuncDel_rocksdb(pState, key);
#else
301
  return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
302
#endif
303 304
}

305
// todo refactor
306
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
307 308 309
#ifdef USE_ROCKSDB
  return streamStatePut_rocksdb(pState, key, value, vLen);
#else
310
  SStateKey sKey = {.key = *key, .opNum = pState->number};
311
  return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
312
#endif
313
}
5
54liuyao 已提交
314 315

// todo refactor
dengyihao's avatar
dengyihao 已提交
316
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
317
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
318
  return streamStateGet_rocksdb(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
319
#else
dengyihao's avatar
dengyihao 已提交
320 321
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
322
#endif
5
54liuyao 已提交
323
}
324
// todo refactor
dengyihao's avatar
dengyihao 已提交
325
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
326
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
327
  return streamStateDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
328
#else
329
  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
330 331 332 333 334 335 336 337 338 339
  return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
#endif
}

// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
  return streamStateFillPut_rocksdb(pState, key, value, vLen);
#else
  return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
340
#endif
341 342
}

5
54liuyao 已提交
343 344
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
345 346 347
#ifdef USE_ROCKSDB
  return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
#else
5
54liuyao 已提交
348
  return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
349
#endif
5
54liuyao 已提交
350 351
}

352
// todo refactor
dengyihao's avatar
dengyihao 已提交
353
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
354
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
355
  return streamStateFillDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
356
#else
dengyihao's avatar
dengyihao 已提交
357
  return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
358
#endif
359 360
}

361
int32_t streamStateClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
362 363 364
#ifdef USE_ROCKSDB
  return streamStateClear_rocksdb(pState);
#else
365 366 367 368
  SWinKey key = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &key, NULL, 0);
  while (1) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
dengyihao's avatar
dengyihao 已提交
369 370
    SWinKey delKey = {0};
    int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
5
54liuyao 已提交
371
    streamStateFreeCur(pCur);
372 373 374 375 376 377 378
    if (code == 0) {
      streamStateDel(pState, &delKey);
    } else {
      break;
    }
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
379
#endif
380 381 382 383
}

void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }

384
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
385 386 387
#ifdef USE_ROCKSDB
  return streamStateAddIfNotExist_rocksdb(pState, key, pVal, pVLen);
#else
388 389 390 391 392
  // todo refactor
  int32_t size = *pVLen;
  if (streamStateGet(pState, key, pVal, pVLen) == 0) {
    return 0;
  }
5
54liuyao 已提交
393 394 395
  *pVal = tdbRealloc(NULL, size);
  memset(*pVal, 0, size);
  return 0;
dengyihao's avatar
dengyihao 已提交
396
#endif
397 398 399 400
}

int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
401
  qDebug("streamStateReleaseBuf");
5
54liuyao 已提交
402 403 404
  if (!pVal) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
405 406 407
#ifdef USE_ROCKSDB
  taosMemoryFree(pVal);
#else
408
  streamFreeVal(pVal);
dengyihao's avatar
dengyihao 已提交
409
#endif
410 411 412
  return 0;
}

413
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
414 415 416
#ifdef USE_ROCKSDB
  return streamStateGetCur_rocksdb(pState, key);
#else
417 418
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
419
  tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL);
420

dengyihao's avatar
dengyihao 已提交
421
  int32_t c = 0;
422
  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
423
  tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c);
424
  if (c != 0) {
5
54liuyao 已提交
425
    streamStateFreeCur(pCur);
426 427
    return NULL;
  }
428
  pCur->number = pState->number;
429
  return pCur;
dengyihao's avatar
dengyihao 已提交
430
#endif
431 432
}

5
54liuyao 已提交
433
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
434 435 436
#ifdef USE_ROCKSDB
  return streamStateFillGetCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
437 438
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
439
  tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
5
54liuyao 已提交
440

5
54liuyao 已提交
441
  int32_t c = 0;
5
54liuyao 已提交
442 443
  tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
  if (c != 0) {
5
54liuyao 已提交
444
    streamStateFreeCur(pCur);
5
54liuyao 已提交
445 446 447
    return NULL;
  }
  return pCur;
dengyihao's avatar
dengyihao 已提交
448
#endif
5
54liuyao 已提交
449 450 451
}

SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
452 453 454
#ifdef USE_ROCKSDB
  return streamStateGetAndCheckCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
455 456 457 458 459 460
  SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
    if (code == 0) {
      return pCur;
    }
5
54liuyao 已提交
461
    streamStateFreeCur(pCur);
5
54liuyao 已提交
462 463
  }
  return NULL;
dengyihao's avatar
dengyihao 已提交
464
#endif
5
54liuyao 已提交
465 466
}

467
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
468 469 470
#ifdef USE_ROCKSDB
  return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
471 472 473 474
  if (!pCur) {
    return -1;
  }
  const SStateKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
475
  int32_t kLen;
476 477 478 479 480 481 482 483
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
484
#endif
485 486 487
}

int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
488 489 490
#ifdef USE_ROCKSDB
  return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
491 492 493
  if (!pCur) {
    return -1;
  }
494
  const SWinKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
495
  int32_t kLen;
496 497 498 499 500
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  *pKey = *pKTmp;
  return 0;
dengyihao's avatar
dengyihao 已提交
501
#endif
502 503
}

5
54liuyao 已提交
504
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
505 506 507
#ifdef USE_ROCKSDB
  return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
508 509 510
  if (!pCur) {
    return -1;
  }
5
54liuyao 已提交
511
  uint64_t groupId = pKey->groupId;
dengyihao's avatar
dengyihao 已提交
512
  int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
5
54liuyao 已提交
513 514 515 516 517 518
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
  }
  return -1;
dengyihao's avatar
dengyihao 已提交
519
#endif
5
54liuyao 已提交
520 521
}

522
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
523 524 525
#ifdef USE_ROCKSDB
  return streamStateGetFirst_rocksdb(pState, key);
#else
526 527 528 529
  // todo refactor
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
dengyihao's avatar
dengyihao 已提交
530
  int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
M
Minglei Jin 已提交
531
  streamStateFreeCur(pCur);
532 533
  streamStateDel(pState, &tmp);
  return code;
dengyihao's avatar
dengyihao 已提交
534
#endif
535 536
}

537
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
538 539 540 541
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_first(pCur->iter);
  return 0;
#else
542
  return tdbTbcMoveToFirst(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
543
#endif
544 545 546
}

int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
547 548 549 550
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_last(pCur->iter);
  return 0;
#else
551
  return tdbTbcMoveToLast(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
552
#endif
553 554
}

555
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
556 557 558
#ifdef USE_ROCKSDB
  return streamStateSeekKeyNext_rocksdb(pState, key);
#else
559 560 561 562 563
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
564
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
565
    streamStateFreeCur(pCur);
566 567 568 569
    return NULL;
  }

  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
570
  int32_t c = 0;
571
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
5
54liuyao 已提交
572
    streamStateFreeCur(pCur);
573 574 575 576 577
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
578
    streamStateFreeCur(pCur);
579 580 581 582
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
583
#endif
584 585
}

5
54liuyao 已提交
586
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
587 588 589
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyNext_rocksdb(pState, key);
#else
590
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
5
54liuyao 已提交
591
  if (!pCur) {
592 593
    return NULL;
  }
5
54liuyao 已提交
594
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
595
    streamStateFreeCur(pCur);
5
54liuyao 已提交
596 597
    return NULL;
  }
598

5
54liuyao 已提交
599
  int32_t c = 0;
600
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
601
    streamStateFreeCur(pCur);
602 603 604 605 606
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
607
    streamStateFreeCur(pCur);
608 609 610 611
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
612
#endif
613 614
}

5
54liuyao 已提交
615
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
616 617 618
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyPrev_rocksdb(pState, key);
#else
619 620 621 622
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
5
54liuyao 已提交
623
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
624
    streamStateFreeCur(pCur);
5
54liuyao 已提交
625 626
    return NULL;
  }
627

5
54liuyao 已提交
628
  int32_t c = 0;
629
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
630
    streamStateFreeCur(pCur);
631 632 633 634 635
    return NULL;
  }
  if (c < 0) return pCur;

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
5
54liuyao 已提交
636
    streamStateFreeCur(pCur);
637 638 639 640
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
641
#endif
642 643 644
}

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
645 646 647
#ifdef USE_ROCKSDB
  return streamStateCurNext_rocksdb(pState, pCur);
#else
5
54liuyao 已提交
648 649 650
  if (!pCur) {
    return -1;
  }
651 652
  //
  return tdbTbcMoveToNext(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
653
#endif
654 655 656
}

int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
657 658 659
#ifdef USE_ROCKSDB
  return streamStateCurPrev_rocksdb(pState, pCur);
#else
660 661 662
  if (!pCur) {
    return -1;
  }
663
  return tdbTbcMoveToPrev(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
664
#endif
665
}
666
void streamStateFreeCur(SStreamStateCur* pCur) {
667 668 669
  if (!pCur) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
670
  qDebug("streamStateFreeCur");
dengyihao's avatar
dengyihao 已提交
671
  rocksdb_iter_destroy(pCur->iter);
dengyihao's avatar
dengyihao 已提交
672
  tdbTbcClose(pCur->pCur);
673 674 675
  taosMemoryFree(pCur);
}

dengyihao's avatar
dengyihao 已提交
676 677 678 679 680 681 682
void streamFreeVal(void* val) {
#ifdef USE_ROCKSDB
  taosMemoryFree(val);
#else
  tdbFree(val);
#endif
}
5
54liuyao 已提交
683 684

int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
685 686 687
#ifdef USE_ROCKSDB
  return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else
5
54liuyao 已提交
688
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
689
  return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
690
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
691
#endif
5
54liuyao 已提交
692 693 694
}

int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
695 696 697 698
#ifdef USE_ROCKSDB
  return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
#else

699
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
dengyihao's avatar
dengyihao 已提交
700 701 702
  SSessionKey resKey = *key;
  void* tmp = NULL;
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
703
  if (code == 0) {
5
54liuyao 已提交
704 705 706 707 708 709 710
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = tdbRealloc(NULL, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
    }
5
54liuyao 已提交
711 712
  }
  streamStateFreeCur(pCur);
713
  return code;
dengyihao's avatar
dengyihao 已提交
714
#endif
5
54liuyao 已提交
715 716 717
}

int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
718 719 720
#ifdef USE_ROCKSDB
  return streamStateSessionDel_rocksdb(pState, key);
#else
5
54liuyao 已提交
721
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
722
  return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
723
#endif
5
54liuyao 已提交
724 725
}

726
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
727 728 729
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
#else
5
54liuyao 已提交
730 731 732 733 734
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
735
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
736 737 738 739 740
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
741
  int32_t c = 0;
5
54liuyao 已提交
742 743 744 745
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
746
  if (c >= 0) return pCur;
5
54liuyao 已提交
747 748 749 750 751 752 753

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
754
#endif
5
54liuyao 已提交
755 756
}

757
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
758 759 760
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key);
#else
761 762 763 764 765
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
766
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
767 768 769 770 771
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
772
  int32_t c = 0;
773 774 775 776 777 778 779 780 781 782 783 784 785
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  if (c <= 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
786
#endif
787 788
}

5
54liuyao 已提交
789
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
790 791 792
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyNext_rocksdb(pState, key);
#else
5
54liuyao 已提交
793 794 795 796 797
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
798
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
799 800 801 802 803
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
804
  int32_t c = 0;
5
54liuyao 已提交
805 806 807 808
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
809
  if (c < 0) return pCur;
5
54liuyao 已提交
810 811 812 813 814 815 816

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
817
#endif
5
54liuyao 已提交
818 819
}

820
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
821 822 823
#ifdef USE_ROCKSDB
  return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
824 825 826
  if (!pCur) {
    return -1;
  }
827
  SStateSessionKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
828
  int32_t kLen;
829
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
5
54liuyao 已提交
830 831 832 833 834 835 836 837 838 839
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
840
#endif
5
54liuyao 已提交
841 842 843
}

int32_t streamStateSessionClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
844 845 846 847
#ifdef USE_ROCKSDB
  return streamStateSessionClear_rocksdb(pState);
#else
  SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
848
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
5
54liuyao 已提交
849 850
  while (1) {
    SSessionKey delKey = {0};
dengyihao's avatar
dengyihao 已提交
851 852 853
    void* buf = NULL;
    int32_t size = 0;
    int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
5
54liuyao 已提交
854
    if (code == 0 && size > 0) {
5
54liuyao 已提交
855 856 857 858 859 860 861 862 863
      memset(buf, 0, size);
      streamStateSessionPut(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return 0;
dengyihao's avatar
dengyihao 已提交
864
#endif
5
54liuyao 已提交
865 866
}

867
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
dengyihao's avatar
dengyihao 已提交
868 869 870
#ifdef USE_ROCKSDB
  return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey);
#else
871 872 873 874 875
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
876
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
877 878
    streamStateFreeCur(pCur);
    return -1;
5
54liuyao 已提交
879 880
  }

881
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
882
  int32_t c = 0;
883 884 885 886 887 888
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return -1;
  }

  SSessionKey resKey = *key;
dengyihao's avatar
dengyihao 已提交
889
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
911 912
    }
  }
913

914
  streamStateFreeCur(pCur);
915
  return -1;
dengyihao's avatar
dengyihao 已提交
916
#endif
917 918
}

919 920
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                        int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
921 922 923
#ifdef USE_ROCKSDB
  return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen);
#else
5
54liuyao 已提交
924
  // todo refactor
dengyihao's avatar
dengyihao 已提交
925
  int32_t res = 0;
926 927 928 929 930
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;
dengyihao's avatar
dengyihao 已提交
931
  void* tmp = tdbRealloc(NULL, valSize);
932 933 934 935 936
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
937
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
938 939 940 941 942 943 944 945 946
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
    streamStateCurNext(pState, pCur);
  } else {
    *key = originKey;
5
54liuyao 已提交
947
    streamStateFreeCur(pCur);
948
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
949
  }
950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966

  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
5
54liuyao 已提交
967
  streamStateFreeCur(pCur);
968
  return res;
dengyihao's avatar
dengyihao 已提交
969 970

#endif
5
54liuyao 已提交
971 972 973 974 975
}

int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
976 977 978 979 980

#ifdef USE_ROCKSDB
  return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else
  int32_t res = 0;
5
54liuyao 已提交
981
  SSessionKey tmpKey = *key;
dengyihao's avatar
dengyihao 已提交
982 983
  int32_t valSize = *pVLen;
  void* tmp = tdbRealloc(NULL, valSize);
5
54liuyao 已提交
984 985 986 987
  if (!tmp) {
    return -1;
  }

988
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
989
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
990 991 992
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
993
      streamStateSessionDel(pState, key);
994 995
      goto _end;
    }
5
54liuyao 已提交
996 997 998 999

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1000
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1001 1002
      goto _end;
    }
5
54liuyao 已提交
1003 1004 1005 1006 1007 1008

    streamStateCurNext(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
1009 1010
  }

1011
  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1012
  if (code == 0) {
5
54liuyao 已提交
1013 1014 1015
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1016
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
dengyihao's avatar
dengyihao 已提交
1030
#endif
5
54liuyao 已提交
1031
}
5
54liuyao 已提交
1032

L
Liu Jicong 已提交
1033
int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
dengyihao's avatar
dengyihao 已提交
1034 1035 1036
#ifdef USE_ROCKSDB
  return streamStatePutParTag_rocksdb(pState, groupId, tag, tagLen);
#else
L
Liu Jicong 已提交
1037
  return tdbTbUpsert(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tag, tagLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
1038
#endif
L
Liu Jicong 已提交
1039 1040 1041
}

int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
dengyihao's avatar
dengyihao 已提交
1042 1043 1044
#ifdef USE_ROCKSDB
  return streamStateGetParTag_rocksdb(pState, groupId, tagVal, tagLen);
#else
L
Liu Jicong 已提交
1045
  return tdbTbGet(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tagVal, tagLen);
dengyihao's avatar
dengyihao 已提交
1046
#endif
L
Liu Jicong 已提交
1047 1048
}

1049
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
dengyihao's avatar
dengyihao 已提交
1050
  qWarn("try to write to cf parname");
dengyihao's avatar
dengyihao 已提交
1051 1052 1053
#ifdef USE_ROCKSDB
  return streamStatePutParName_rocksdb(pState, groupId, tbname);
#else
L
Liu Jicong 已提交
1054 1055
  return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
1056
#endif
1057 1058 1059
}

int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
dengyihao's avatar
dengyihao 已提交
1060 1061 1062
#ifdef USE_ROCKSDB
  return streamStateGetParName_rocksdb(pState, groupId, pVal);
#else
1063
  int32_t len;
5
54liuyao 已提交
1064
  return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
dengyihao's avatar
dengyihao 已提交
1065
#endif
5
54liuyao 已提交
1066 1067 1068
}

void streamStateDestroy(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
1069 1070 1071 1072
#ifdef USE_ROCKSDB
  streamStateDestroy_rocksdb(pState);
  // do nothong
#endif
5
54liuyao 已提交
1073 1074
  taosMemoryFreeClear(pState->pTdbState);
  taosMemoryFreeClear(pState);
1075 1076
}

5
54liuyao 已提交
1077 1078 1079 1080 1081 1082 1083
#if 0
char* streamStateSessionDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
1084
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
1085 1086 1087 1088 1089 1090
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SSessionKey key = {0};
1091 1092 1093
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
5
54liuyao 已提交
1094
  if (code != 0) {
1095
    streamStateFreeCur(pCur);
5
54liuyao 已提交
1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
  len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SSessionKey){0};
    code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
1110
      streamStateFreeCur(pCur);
5
54liuyao 已提交
1111 1112 1113 1114 1115 1116
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
    len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
1117
  streamStateFreeCur(pCur);
5
54liuyao 已提交
1118 1119
  return dumpBuf;
}
5
54liuyao 已提交
1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162

char* streamStateIntervalDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SWinKey key = {0};
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateGetKVByCur(pCur, &key, (const void **)&buf, &bufSize);
  if (code != 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
  // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SWinKey){0};
    code = streamStateGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
      streamStateFreeCur(pCur);
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
    // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
  streamStateFreeCur(pCur);
  return dumpBuf;
}
5
54liuyao 已提交
1163
#endif