streamState.c 31.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "streamState.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include <bits/stdint-uintn.h>
#include <string.h>
19
#include "executor.h"
dengyihao's avatar
dengyihao 已提交
20
#include "osMemory.h"
dengyihao's avatar
dengyihao 已提交
21
#include "rocksdb/c.h"
dengyihao's avatar
dengyihao 已提交
22
#include "streamBackendRocksdb.h"
23
#include "streamInc.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tcoding.h"
25
#include "tcommon.h"
26
#include "tcompare.h"
27 28
#include "ttimer.h"

L
liuyao 已提交
29 30
#define MAX_TABLE_NAME_NUM  100000

dengyihao's avatar
dengyihao 已提交
31
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
5
54liuyao 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
47
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  if (pWin1->win.ekey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
69
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
5
54liuyao 已提交
70 71 72 73 74 75 76 77 78
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

79
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
5
54liuyao 已提交
80 81
}

dengyihao's avatar
dengyihao 已提交
82
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
  SStateKey* pWin1 = (SStateKey*)pKey1;
  SStateKey* pWin2 = (SStateKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

  if (pWin1->key.ts > pWin2->key.ts) {
    return 1;
  } else if (pWin1->key.ts < pWin2->key.ts) {
    return -1;
  }

  if (pWin1->key.groupId > pWin2->key.groupId) {
    return 1;
  } else if (pWin1->key.groupId < pWin2->key.groupId) {
    return -1;
  }

  return 0;
}

107
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
dengyihao's avatar
dengyihao 已提交
108
  qWarn("open stream state, %s", path);
109 110 111 112 113
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
  if (pState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
5
54liuyao 已提交
114 115 116 117 118 119
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
  if (pState->pTdbState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    streamStateDestroy(pState);
    return NULL;
  }
L
Liu Jicong 已提交
120

121
  char statePath[1024];
L
Liu Jicong 已提交
122 123 124
  if (!specPath) {
    sprintf(statePath, "%s/%d", path, pTask->taskId);
  } else {
125 126
    memset(statePath, 0, 1024);
    tstrncpy(statePath, path, 1024);
L
Liu Jicong 已提交
127
  }
dengyihao's avatar
dengyihao 已提交
128 129 130 131 132 133 134 135 136
#ifdef USE_ROCKSDB
  qWarn("open stream state1");
  int code = streamInitBackend(pState, statePath);
  if (code == -1) {
    taosMemoryFree(pState);
    pState = NULL;
  }
  qWarn("open stream state2, %s", statePath);
  pState->pTdbState->pOwner = pTask;
5
54liuyao 已提交
137
  pState->pFileState = NULL;
L
liuyao 已提交
138 139
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
  pState->parNameMap = tSimpleHashInit(1024, hashFn);
dengyihao's avatar
dengyihao 已提交
140 141 142
  return pState;

#else
L
add cfg  
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
  char cfgPath[1030];
L
add cfg  
Liu Jicong 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157
  sprintf(cfgPath, "%s/cfg", statePath);

  char cfg[1024];
  memset(cfg, 0, 1024);
  TdFilePtr pCfgFile = taosOpenFile(cfgPath, TD_FILE_READ);
  if (pCfgFile != NULL) {
    int64_t size;
    taosFStatFile(pCfgFile, &size, NULL);
    taosReadFile(pCfgFile, cfg, size);
    sscanf(cfg, "%d\n%d\n", &szPage, &pages);
  } else {
    taosMulModeMkDir(statePath, 0755);
    pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE);
L
Liu Jicong 已提交
158 159
    szPage = szPage < 0 ? 4096 : szPage;
    pages = pages < 0 ? 256 : pages;
L
add cfg  
Liu Jicong 已提交
160 161 162 163 164
    sprintf(cfg, "%d\n%d\n", szPage, pages);
    taosWriteFile(pCfgFile, cfg, strlen(cfg));
  }
  taosCloseFile(&pCfgFile);

165
  if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) {
166 167 168 169
    goto _err;
  }

  // open state storage backend
5
54liuyao 已提交
170 171
  if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
                0) < 0) {
172 173 174
    goto _err;
  }

5
54liuyao 已提交
175
  // todo refactor
5
54liuyao 已提交
176 177
  if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFillStateDb, 0) < 0) {
5
54liuyao 已提交
178 179 180
    goto _err;
  }

5
54liuyao 已提交
181 182
  if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pSessionStateDb, 0) < 0) {
5
54liuyao 已提交
183 184 185
    goto _err;
  }

5
54liuyao 已提交
186 187
  if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFuncStateDb, 0) < 0) {
188 189 190
    goto _err;
  }

5
54liuyao 已提交
191 192
  if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
                &pState->pTdbState->pParNameDb, 0) < 0) {
193 194 195
    goto _err;
  }

L
Liu Jicong 已提交
196 197 198 199 200
  if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) <
      0) {
    goto _err;
  }

201
  if (streamStateBegin(pState) < 0) {
202 203 204
    goto _err;
  }

5
54liuyao 已提交
205
  pState->pTdbState->pOwner = pTask;
206 207 208 209

  return pState;

_err:
5
54liuyao 已提交
210 211 212 213 214
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
215
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
216 217
  tdbClose(pState->pTdbState->db);
  streamStateDestroy(pState);
218
  return NULL;
dengyihao's avatar
dengyihao 已提交
219
#endif
220 221 222
}

void streamStateClose(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
223
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
224
  streamCleanBackend(pState);
dengyihao's avatar
dengyihao 已提交
225
#else
226 227
  tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
  tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
5
54liuyao 已提交
228 229 230 231 232
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
233
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
234
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
235
#endif
5
54liuyao 已提交
236
  streamStateDestroy(pState);
237 238 239
}

int32_t streamStateBegin(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
240 241 242
#ifdef USE_ROCKSDB
  return 0;
#else
243
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
244
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
245
    tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
246 247 248
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
249
#endif
250 251 252
}

int32_t streamStateCommit(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
253
#ifdef USE_ROCKSDB
L
liuyao 已提交
254 255
  SStreamSnapshot* pShot = getSnapshot(pState->pFileState);
  flushSnapshot(pState->pFileState, pShot, true);
dengyihao's avatar
dengyihao 已提交
256 257
  return 0;
#else
258
  if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
M
Minglei Jin 已提交
259 260
    return -1;
  }
261
  if (tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
262 263
    return -1;
  }
264

265
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
266
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
267 268 269
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
270
#endif
271 272
}

273
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
274 275 276
#ifdef USE_ROCKSDB
  return streamStateFuncPut_rocksdb(pState, key, value, vLen);
#else
277
  return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
278
#endif
279 280
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
281
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
282
  return streamStateFuncGet_rocksdb(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
283
#else
5
54liuyao 已提交
284
  return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
285
#endif
286 287 288
}

int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
dengyihao's avatar
dengyihao 已提交
289 290 291
#ifdef USE_ROCKSDB
  return streamStateFuncDel_rocksdb(pState, key);
#else
292
  return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
293
#endif
294 295
}

296
// todo refactor
297
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
298
#ifdef USE_ROCKSDB
5
54liuyao 已提交
299 300
  return 0;
  // return streamStatePut_rocksdb(pState, key, value, vLen);
dengyihao's avatar
dengyihao 已提交
301
#else
302
  SStateKey sKey = {.key = *key, .opNum = pState->number};
303
  return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
304
#endif
305
}
5
54liuyao 已提交
306

dengyihao's avatar
dengyihao 已提交
307
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
308
#ifdef USE_ROCKSDB
5
54liuyao 已提交
309
  return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
310
#else
dengyihao's avatar
dengyihao 已提交
311 312
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
313
#endif
5
54liuyao 已提交
314
}
5
54liuyao 已提交
315 316 317 318 319 320 321 322 323 324 325

bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
  #ifdef USE_ROCKSDB
  return hasRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey));
#else
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
#endif
}

int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
L
liuyao 已提交
326 327 328
  int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
  releaseRowBuffPos(pos);
  return code;
5
54liuyao 已提交
329 330
}

331
// todo refactor
dengyihao's avatar
dengyihao 已提交
332
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
333
#ifdef USE_ROCKSDB
5
54liuyao 已提交
334
  return deleteRowBuff(pState->pFileState, key, sizeof(SWinKey));
dengyihao's avatar
dengyihao 已提交
335
#else
336
  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
337 338 339 340 341 342 343 344 345 346
  return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
#endif
}

// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
  return streamStateFillPut_rocksdb(pState, key, value, vLen);
#else
  return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
347
#endif
348 349
}

5
54liuyao 已提交
350 351
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
352 353 354
#ifdef USE_ROCKSDB
  return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
#else
5
54liuyao 已提交
355
  return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
356
#endif
5
54liuyao 已提交
357 358
}

359
// todo refactor
dengyihao's avatar
dengyihao 已提交
360
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
361
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
362
  return streamStateFillDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
363
#else
dengyihao's avatar
dengyihao 已提交
364
  return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
365
#endif
366 367
}

368
int32_t streamStateClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
369
#ifdef USE_ROCKSDB
5
54liuyao 已提交
370
  streamFileStateClear(pState->pFileState);
dengyihao's avatar
dengyihao 已提交
371 372
  return streamStateClear_rocksdb(pState);
#else
373 374 375 376
  SWinKey key = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &key, NULL, 0);
  while (1) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
dengyihao's avatar
dengyihao 已提交
377 378
    SWinKey delKey = {0};
    int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
5
54liuyao 已提交
379
    streamStateFreeCur(pCur);
380 381 382 383 384 385 386
    if (code == 0) {
      streamStateDel(pState, &delKey);
    } else {
      break;
    }
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
387
#endif
388 389 390 391
}

void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }

392
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
393
#ifdef USE_ROCKSDB
5
54liuyao 已提交
394
  return streamStateGet(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
395
#else
396 397 398 399 400
  // todo refactor
  int32_t size = *pVLen;
  if (streamStateGet(pState, key, pVal, pVLen) == 0) {
    return 0;
  }
5
54liuyao 已提交
401 402 403
  *pVal = tdbRealloc(NULL, size);
  memset(*pVal, 0, size);
  return 0;
dengyihao's avatar
dengyihao 已提交
404
#endif
405 406 407 408
}

int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
409
  qDebug("streamStateReleaseBuf");
5
54liuyao 已提交
410 411 412
  if (!pVal) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
413 414 415
#ifdef USE_ROCKSDB
  taosMemoryFree(pVal);
#else
416
  streamFreeVal(pVal);
dengyihao's avatar
dengyihao 已提交
417
#endif
418 419 420
  return 0;
}

5
54liuyao 已提交
421
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
422 423 424
#ifdef USE_ROCKSDB
  return streamStateFillGetCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
425 426
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
427
  tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
5
54liuyao 已提交
428

5
54liuyao 已提交
429
  int32_t c = 0;
5
54liuyao 已提交
430 431
  tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
  if (c != 0) {
5
54liuyao 已提交
432
    streamStateFreeCur(pCur);
5
54liuyao 已提交
433 434 435
    return NULL;
  }
  return pCur;
dengyihao's avatar
dengyihao 已提交
436
#endif
5
54liuyao 已提交
437 438 439
}

SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
440 441 442
#ifdef USE_ROCKSDB
  return streamStateGetAndCheckCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
443 444 445 446 447 448
  SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
    if (code == 0) {
      return pCur;
    }
5
54liuyao 已提交
449
    streamStateFreeCur(pCur);
5
54liuyao 已提交
450 451
  }
  return NULL;
dengyihao's avatar
dengyihao 已提交
452
#endif
5
54liuyao 已提交
453 454
}

455
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
456 457 458
#ifdef USE_ROCKSDB
  return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
459 460 461 462
  if (!pCur) {
    return -1;
  }
  const SStateKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
463
  int32_t kLen;
464 465 466 467 468 469 470 471
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
472
#endif
473 474 475
}

int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
476 477 478
#ifdef USE_ROCKSDB
  return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
479 480 481
  if (!pCur) {
    return -1;
  }
482
  const SWinKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
483
  int32_t kLen;
484 485 486 487 488
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  *pKey = *pKTmp;
  return 0;
dengyihao's avatar
dengyihao 已提交
489
#endif
490 491
}

5
54liuyao 已提交
492
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
493 494 495
#ifdef USE_ROCKSDB
  return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
496 497 498
  if (!pCur) {
    return -1;
  }
5
54liuyao 已提交
499
  uint64_t groupId = pKey->groupId;
dengyihao's avatar
dengyihao 已提交
500
  int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
5
54liuyao 已提交
501 502 503 504 505 506
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
  }
  return -1;
dengyihao's avatar
dengyihao 已提交
507
#endif
5
54liuyao 已提交
508 509
}

510
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
511 512 513
#ifdef USE_ROCKSDB
  return streamStateGetFirst_rocksdb(pState, key);
#else
514 515 516 517
  // todo refactor
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
dengyihao's avatar
dengyihao 已提交
518
  int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
M
Minglei Jin 已提交
519
  streamStateFreeCur(pCur);
520 521
  streamStateDel(pState, &tmp);
  return code;
dengyihao's avatar
dengyihao 已提交
522
#endif
523 524
}

525
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
526 527 528 529
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_first(pCur->iter);
  return 0;
#else
530
  return tdbTbcMoveToFirst(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
531
#endif
532 533 534
}

int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
535 536 537 538
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_last(pCur->iter);
  return 0;
#else
539
  return tdbTbcMoveToLast(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
540
#endif
541 542
}

543
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
544 545 546
#ifdef USE_ROCKSDB
  return streamStateSeekKeyNext_rocksdb(pState, key);
#else
547 548 549 550 551
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
552
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
553
    streamStateFreeCur(pCur);
554 555 556 557
    return NULL;
  }

  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
558
  int32_t c = 0;
559
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
5
54liuyao 已提交
560
    streamStateFreeCur(pCur);
561 562 563 564 565
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
566
    streamStateFreeCur(pCur);
567 568 569 570
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
571
#endif
572 573
}

5
54liuyao 已提交
574
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
575 576 577
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyNext_rocksdb(pState, key);
#else
578
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
5
54liuyao 已提交
579
  if (!pCur) {
580 581
    return NULL;
  }
5
54liuyao 已提交
582
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
583
    streamStateFreeCur(pCur);
5
54liuyao 已提交
584 585
    return NULL;
  }
586

5
54liuyao 已提交
587
  int32_t c = 0;
588
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
589
    streamStateFreeCur(pCur);
590 591 592 593 594
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
595
    streamStateFreeCur(pCur);
596 597 598 599
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
600
#endif
601 602
}

5
54liuyao 已提交
603
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
604 605 606
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyPrev_rocksdb(pState, key);
#else
607 608 609 610
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
5
54liuyao 已提交
611
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
612
    streamStateFreeCur(pCur);
5
54liuyao 已提交
613 614
    return NULL;
  }
615

5
54liuyao 已提交
616
  int32_t c = 0;
617
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
618
    streamStateFreeCur(pCur);
619 620 621 622 623
    return NULL;
  }
  if (c < 0) return pCur;

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
5
54liuyao 已提交
624
    streamStateFreeCur(pCur);
625 626 627 628
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
629
#endif
630 631 632
}

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
633 634 635
#ifdef USE_ROCKSDB
  return streamStateCurNext_rocksdb(pState, pCur);
#else
5
54liuyao 已提交
636 637 638
  if (!pCur) {
    return -1;
  }
639 640
  //
  return tdbTbcMoveToNext(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
641
#endif
642 643 644
}

int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
645 646 647
#ifdef USE_ROCKSDB
  return streamStateCurPrev_rocksdb(pState, pCur);
#else
648 649 650
  if (!pCur) {
    return -1;
  }
651
  return tdbTbcMoveToPrev(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
652
#endif
653
}
654
void streamStateFreeCur(SStreamStateCur* pCur) {
655 656 657
  if (!pCur) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
658
  qDebug("streamStateFreeCur");
dengyihao's avatar
dengyihao 已提交
659
  rocksdb_iter_destroy(pCur->iter);
dengyihao's avatar
dengyihao 已提交
660
  if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot);
dengyihao's avatar
dengyihao 已提交
661 662
  rocksdb_readoptions_destroy(pCur->readOpt);

dengyihao's avatar
dengyihao 已提交
663
  tdbTbcClose(pCur->pCur);
664 665 666
  taosMemoryFree(pCur);
}

dengyihao's avatar
dengyihao 已提交
667 668 669 670 671 672 673
void streamFreeVal(void* val) {
#ifdef USE_ROCKSDB
  taosMemoryFree(val);
#else
  tdbFree(val);
#endif
}
5
54liuyao 已提交
674 675

int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
676 677 678
#ifdef USE_ROCKSDB
  return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else
5
54liuyao 已提交
679
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
680
  return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
681
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
682
#endif
5
54liuyao 已提交
683 684 685
}

int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
686 687 688 689
#ifdef USE_ROCKSDB
  return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
#else

690
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
dengyihao's avatar
dengyihao 已提交
691 692 693
  SSessionKey resKey = *key;
  void* tmp = NULL;
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
694
  if (code == 0) {
5
54liuyao 已提交
695 696 697 698 699 700 701
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = tdbRealloc(NULL, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
    }
5
54liuyao 已提交
702 703
  }
  streamStateFreeCur(pCur);
704
  return code;
dengyihao's avatar
dengyihao 已提交
705
#endif
5
54liuyao 已提交
706 707 708
}

int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
709 710 711
#ifdef USE_ROCKSDB
  return streamStateSessionDel_rocksdb(pState, key);
#else
5
54liuyao 已提交
712
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
713
  return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
714
#endif
5
54liuyao 已提交
715 716
}

717
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
718 719 720
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
#else
5
54liuyao 已提交
721 722 723 724 725
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
726
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
727 728 729 730 731
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
732
  int32_t c = 0;
5
54liuyao 已提交
733 734 735 736
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
737
  if (c >= 0) return pCur;
5
54liuyao 已提交
738 739 740 741 742 743 744

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
745
#endif
5
54liuyao 已提交
746 747
}

748
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
749 750 751
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key);
#else
752 753 754 755 756
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
757
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
758 759 760 761 762
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
763
  int32_t c = 0;
764 765 766 767 768 769 770 771 772 773 774 775 776
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  if (c <= 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
777
#endif
778 779
}

5
54liuyao 已提交
780
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
781 782 783
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyNext_rocksdb(pState, key);
#else
5
54liuyao 已提交
784 785 786 787 788
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
789
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
790 791 792 793 794
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
795
  int32_t c = 0;
5
54liuyao 已提交
796 797 798 799
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
800
  if (c < 0) return pCur;
5
54liuyao 已提交
801 802 803 804 805 806 807

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
808
#endif
5
54liuyao 已提交
809 810
}

811
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
812 813 814
#ifdef USE_ROCKSDB
  return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
815 816 817
  if (!pCur) {
    return -1;
  }
818
  SStateSessionKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
819
  int32_t kLen;
820
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
5
54liuyao 已提交
821 822 823 824 825 826 827 828 829 830
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
831
#endif
5
54liuyao 已提交
832 833 834
}

int32_t streamStateSessionClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
835 836 837 838
#ifdef USE_ROCKSDB
  return streamStateSessionClear_rocksdb(pState);
#else
  SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
839
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
5
54liuyao 已提交
840 841
  while (1) {
    SSessionKey delKey = {0};
dengyihao's avatar
dengyihao 已提交
842 843 844
    void* buf = NULL;
    int32_t size = 0;
    int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
5
54liuyao 已提交
845
    if (code == 0 && size > 0) {
5
54liuyao 已提交
846 847 848 849 850 851 852 853 854
      memset(buf, 0, size);
      streamStateSessionPut(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return 0;
dengyihao's avatar
dengyihao 已提交
855
#endif
5
54liuyao 已提交
856 857
}

858
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
dengyihao's avatar
dengyihao 已提交
859 860 861
#ifdef USE_ROCKSDB
  return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey);
#else
862 863 864 865 866
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
867
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
868 869
    streamStateFreeCur(pCur);
    return -1;
5
54liuyao 已提交
870 871
  }

872
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
873
  int32_t c = 0;
874 875 876 877 878 879
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return -1;
  }

  SSessionKey resKey = *key;
dengyihao's avatar
dengyihao 已提交
880
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
902 903
    }
  }
904

905
  streamStateFreeCur(pCur);
906
  return -1;
dengyihao's avatar
dengyihao 已提交
907
#endif
908 909
}

910 911
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                        int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
912 913 914
#ifdef USE_ROCKSDB
  return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen);
#else
5
54liuyao 已提交
915
  // todo refactor
dengyihao's avatar
dengyihao 已提交
916
  int32_t res = 0;
917 918 919 920 921
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;
dengyihao's avatar
dengyihao 已提交
922
  void* tmp = tdbRealloc(NULL, valSize);
923 924 925 926 927
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
928
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
929 930 931 932 933 934 935 936 937
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
    streamStateCurNext(pState, pCur);
  } else {
    *key = originKey;
5
54liuyao 已提交
938
    streamStateFreeCur(pCur);
939
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
940
  }
941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957

  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
5
54liuyao 已提交
958
  streamStateFreeCur(pCur);
959
  return res;
dengyihao's avatar
dengyihao 已提交
960 961

#endif
5
54liuyao 已提交
962 963 964 965 966
}

int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
967 968 969 970 971

#ifdef USE_ROCKSDB
  return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else
  int32_t res = 0;
5
54liuyao 已提交
972
  SSessionKey tmpKey = *key;
dengyihao's avatar
dengyihao 已提交
973 974
  int32_t valSize = *pVLen;
  void* tmp = tdbRealloc(NULL, valSize);
5
54liuyao 已提交
975 976 977 978
  if (!tmp) {
    return -1;
  }

979
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
980
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
981 982 983
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
984
      streamStateSessionDel(pState, key);
985 986
      goto _end;
    }
5
54liuyao 已提交
987 988 989 990

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
991
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
992 993
      goto _end;
    }
5
54liuyao 已提交
994 995 996 997 998 999

    streamStateCurNext(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
1000 1001
  }

1002
  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1003
  if (code == 0) {
5
54liuyao 已提交
1004 1005 1006
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1007
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
dengyihao's avatar
dengyihao 已提交
1021
#endif
5
54liuyao 已提交
1022
}
5
54liuyao 已提交
1023

1024
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
dengyihao's avatar
dengyihao 已提交
1025
  qWarn("try to write to cf parname");
dengyihao's avatar
dengyihao 已提交
1026
#ifdef USE_ROCKSDB
L
liuyao 已提交
1027 1028 1029 1030 1031 1032 1033 1034
  if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
    if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
      streamStatePutParName_rocksdb(pState, groupId, tbname);
    }
    return TSDB_CODE_SUCCESS;
  }
  tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1035
#else
L
Liu Jicong 已提交
1036 1037
  return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
1038
#endif
1039 1040 1041
}

int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
dengyihao's avatar
dengyihao 已提交
1042
#ifdef USE_ROCKSDB
L
liuyao 已提交
1043 1044 1045 1046 1047 1048 1049 1050 1051 1052
  void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t));
  if (!pStr) {
    if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
      return streamStateGetParName_rocksdb(pState, groupId, pVal);
    }
    return TSDB_CODE_FAILED;
  }
  *pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
  memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN);
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1053
#else
1054
  int32_t len;
5
54liuyao 已提交
1055
  return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
dengyihao's avatar
dengyihao 已提交
1056
#endif
5
54liuyao 已提交
1057 1058 1059
}

void streamStateDestroy(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
1060
#ifdef USE_ROCKSDB
5
54liuyao 已提交
1061
  streamFileStateDestroy(pState->pFileState);
dengyihao's avatar
dengyihao 已提交
1062 1063 1064
  streamStateDestroy_rocksdb(pState);
  // do nothong
#endif
5
54liuyao 已提交
1065 1066
  taosMemoryFreeClear(pState->pTdbState);
  taosMemoryFreeClear(pState);
1067 1068
}

5
54liuyao 已提交
1069 1070 1071 1072 1073 1074 1075
#if 0
char* streamStateSessionDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
1076
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
1077 1078 1079 1080 1081 1082
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SSessionKey key = {0};
1083 1084 1085
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
5
54liuyao 已提交
1086
  if (code != 0) {
1087
    streamStateFreeCur(pCur);
5
54liuyao 已提交
1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
  len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SSessionKey){0};
    code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
1102
      streamStateFreeCur(pCur);
5
54liuyao 已提交
1103 1104 1105 1106 1107 1108
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
    len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
1109
  streamStateFreeCur(pCur);
5
54liuyao 已提交
1110 1111
  return dumpBuf;
}
5
54liuyao 已提交
1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154

char* streamStateIntervalDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SWinKey key = {0};
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateGetKVByCur(pCur, &key, (const void **)&buf, &bufSize);
  if (code != 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
  // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SWinKey){0};
    code = streamStateGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
      streamStateFreeCur(pCur);
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
    // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
  streamStateFreeCur(pCur);
  return dumpBuf;
}
5
54liuyao 已提交
1155
#endif