streamState.c 33.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "streamState.h"
17
#include "executor.h"
dengyihao's avatar
dengyihao 已提交
18
#include "osMemory.h"
dengyihao's avatar
dengyihao 已提交
19
#include "rocksdb/c.h"
dengyihao's avatar
dengyihao 已提交
20
#include "streamBackendRocksdb.h"
21
#include "streamInc.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tcoding.h"
23
#include "tcommon.h"
24
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
25
#include "tref.h"
26 27
#include "ttimer.h"

L
liuyao 已提交
28
#define MAX_TABLE_NAME_NUM 2000000
L
liuyao 已提交
29

dengyihao's avatar
dengyihao 已提交
30
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
5
54liuyao 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
46
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  if (pWin1->win.ekey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
68
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
5
54liuyao 已提交
69 70 71 72 73 74 75 76 77
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

78
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
5
54liuyao 已提交
79 80
}

dengyihao's avatar
dengyihao 已提交
81
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
82 83 84 85 86 87 88 89 90
  SStateKey* pWin1 = (SStateKey*)pKey1;
  SStateKey* pWin2 = (SStateKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

L
liuyao 已提交
91
  return winKeyCmprImpl(&pWin1->key, &pWin2->key);
92 93
}

94 95
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) {
  qDebug("open stream state, %s", path);
96 97 98 99 100
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
  if (pState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
101

5
54liuyao 已提交
102 103 104
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
  if (pState->pTdbState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
105
    streamStateDestroy(pState, true);
5
54liuyao 已提交
106 107
    return NULL;
  }
L
Liu Jicong 已提交
108

109
  SStreamTask* pStreamTask = pTask;
110
  char statePath[1024];
L
Liu Jicong 已提交
111
  if (!specPath) {
112
    sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId);
L
Liu Jicong 已提交
113
  } else {
114 115
    memset(statePath, 0, 1024);
    tstrncpy(statePath, path, 1024);
L
Liu Jicong 已提交
116
  }
117 118 119 120

  pState->taskId = pStreamTask->id.taskId;
  pState->streamId = pStreamTask->id.streamId;

dengyihao's avatar
dengyihao 已提交
121
#ifdef USE_ROCKSDB
122 123 124
  SStreamMeta* pMeta = pStreamTask->pMeta;
  taosAcquireRef(pMeta->streamBackendId, pMeta->streamBackendRid);
  int code = streamStateOpenBackend(pMeta->streamBackend, pState);
dengyihao's avatar
dengyihao 已提交
125
  if (code == -1) {
126
    taosReleaseRef(pMeta->streamBackendId, pMeta->streamBackendRid);
dengyihao's avatar
dengyihao 已提交
127 128 129
    taosMemoryFree(pState);
    pState = NULL;
  }
130

dengyihao's avatar
dengyihao 已提交
131
  pState->pTdbState->pOwner = pTask;
5
54liuyao 已提交
132
  pState->pFileState = NULL;
L
liuyao 已提交
133
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
dengyihao's avatar
dengyihao 已提交
134

135
  pState->parNameMap = tSimpleHashInit(1024, hashFn);
dengyihao's avatar
dengyihao 已提交
136 137 138
  return pState;

#else
L
add cfg  
Liu Jicong 已提交
139

L
Liu Jicong 已提交
140
  char cfgPath[1030];
L
add cfg  
Liu Jicong 已提交
141 142
  sprintf(cfgPath, "%s/cfg", statePath);

D
dapan1121 已提交
143 144
  szPage = szPage < 0 ? 4096 : szPage;
  pages = pages < 0 ? 256 : pages;
L
add cfg  
Liu Jicong 已提交
145 146 147 148
  char cfg[1024];
  memset(cfg, 0, 1024);
  TdFilePtr pCfgFile = taosOpenFile(cfgPath, TD_FILE_READ);
  if (pCfgFile != NULL) {
D
dapan1121 已提交
149
    int64_t size = 0;
L
add cfg  
Liu Jicong 已提交
150
    taosFStatFile(pCfgFile, &size, NULL);
D
dapan1121 已提交
151 152 153 154
    if (size > 0) {
      taosReadFile(pCfgFile, cfg, size);
      sscanf(cfg, "%d\n%d\n", &szPage, &pages);
    }
L
add cfg  
Liu Jicong 已提交
155
  } else {
D
dapan1121 已提交
156 157 158 159 160 161
    int32_t code = taosMulModeMkDir(statePath, 0755);
    if (code == 0) {
      pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE);
      sprintf(cfg, "%d\n%d\n", szPage, pages);
      taosWriteFile(pCfgFile, cfg, strlen(cfg));
    }
L
add cfg  
Liu Jicong 已提交
162 163 164
  }
  taosCloseFile(&pCfgFile);

165
  if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) {
166 167 168 169
    goto _err;
  }

  // open state storage backend
5
54liuyao 已提交
170 171
  if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
                0) < 0) {
172 173 174
    goto _err;
  }

5
54liuyao 已提交
175
  // todo refactor
5
54liuyao 已提交
176 177
  if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFillStateDb, 0) < 0) {
5
54liuyao 已提交
178 179 180
    goto _err;
  }

5
54liuyao 已提交
181 182
  if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pSessionStateDb, 0) < 0) {
5
54liuyao 已提交
183 184 185
    goto _err;
  }

5
54liuyao 已提交
186 187
  if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFuncStateDb, 0) < 0) {
188 189 190
    goto _err;
  }

5
54liuyao 已提交
191 192
  if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
                &pState->pTdbState->pParNameDb, 0) < 0) {
193 194 195
    goto _err;
  }

L
Liu Jicong 已提交
196 197 198 199 200
  if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) <
      0) {
    goto _err;
  }

201
  if (streamStateBegin(pState) < 0) {
202 203 204
    goto _err;
  }

5
54liuyao 已提交
205
  pState->pTdbState->pOwner = pTask;
L
liuyao 已提交
206
  pState->checkPointId = 0;
207 208 209 210

  return pState;

_err:
5
54liuyao 已提交
211 212 213 214 215
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
216
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
217
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
218
  streamStateDestroy(pState, false);
219
  return NULL;
dengyihao's avatar
dengyihao 已提交
220
#endif
221 222
}

dengyihao's avatar
dengyihao 已提交
223
void streamStateClose(SStreamState* pState, bool remove) {
dengyihao's avatar
dengyihao 已提交
224
  SStreamTask* pTask = pState->pTdbState->pOwner;
dengyihao's avatar
dengyihao 已提交
225
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
226 227
  // streamStateCloseBackend(pState);
  streamStateDestroy(pState, remove);
dengyihao's avatar
dengyihao 已提交
228
  taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
dengyihao's avatar
dengyihao 已提交
229
#else
230 231
  tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
  tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
5
54liuyao 已提交
232 233 234 235 236
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
237
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
238
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
239
#endif
240 241 242
}

int32_t streamStateBegin(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
243 244 245
#ifdef USE_ROCKSDB
  return 0;
#else
246
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
247
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
248
    tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
249 250 251
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
252
#endif
253 254 255
}

int32_t streamStateCommit(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
256
#ifdef USE_ROCKSDB
L
liuyao 已提交
257 258 259 260
  if (pState->pFileState) {
    SStreamSnapshot* pShot = getSnapshot(pState->pFileState);
    flushSnapshot(pState->pFileState, pShot, true);
  }
5
54liuyao 已提交
261
  pState->checkPointId++;
dengyihao's avatar
dengyihao 已提交
262 263
  return 0;
#else
264
  if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
M
Minglei Jin 已提交
265 266
    return -1;
  }
267
  if (tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
268 269
    return -1;
  }
270

271
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
272
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
273 274
    return -1;
  }
L
liuyao 已提交
275
  pState->checkPointId++;
276
  return 0;
dengyihao's avatar
dengyihao 已提交
277
#endif
278 279
}

L
liuyao 已提交
280
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
281
#ifdef USE_ROCKSDB
L
liuyao 已提交
282 283 284 285 286 287 288
  void* pVal = NULL;
  int32_t len = 0;
  int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
  char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
  uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
  memcpy(buf + len - rowSize, value, vLen);
  return code;
dengyihao's avatar
dengyihao 已提交
289
#else
290
  return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
291
#endif
292
}
L
liuyao 已提交
293
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
294
#ifdef USE_ROCKSDB
L
liuyao 已提交
295 296 297 298 299 300 301
  void* pVal = NULL;
  int32_t len = 0;
  int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
  char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
  uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
  *ppVal = buf + len - rowSize;
  return code;
dengyihao's avatar
dengyihao 已提交
302
#else
L
liuyao 已提交
303
  return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen);
dengyihao's avatar
dengyihao 已提交
304
#endif
305 306
}

307
// todo refactor
308
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
309
#ifdef USE_ROCKSDB
5
54liuyao 已提交
310 311
  return 0;
  // return streamStatePut_rocksdb(pState, key, value, vLen);
dengyihao's avatar
dengyihao 已提交
312
#else
313
  SStateKey sKey = {.key = *key, .opNum = pState->number};
314
  return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
315
#endif
316
}
5
54liuyao 已提交
317

dengyihao's avatar
dengyihao 已提交
318
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
319
#ifdef USE_ROCKSDB
5
54liuyao 已提交
320
  return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
321
#else
dengyihao's avatar
dengyihao 已提交
322 323
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
324
#endif
5
54liuyao 已提交
325
}
5
54liuyao 已提交
326 327

bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
328
#ifdef USE_ROCKSDB
5
54liuyao 已提交
329 330 331 332 333 334 335 336
  return hasRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey));
#else
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
#endif
}

int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
L
liuyao 已提交
337 338 339
  int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
  releaseRowBuffPos(pos);
  return code;
5
54liuyao 已提交
340 341
}

342
// todo refactor
dengyihao's avatar
dengyihao 已提交
343
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
344
#ifdef USE_ROCKSDB
5
54liuyao 已提交
345
  return deleteRowBuff(pState->pFileState, key, sizeof(SWinKey));
dengyihao's avatar
dengyihao 已提交
346
#else
347
  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
348 349 350 351 352 353 354 355 356 357
  return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
#endif
}

// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
  return streamStateFillPut_rocksdb(pState, key, value, vLen);
#else
  return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
358
#endif
359 360
}

5
54liuyao 已提交
361 362
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
363 364 365
#ifdef USE_ROCKSDB
  return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
#else
5
54liuyao 已提交
366
  return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
367
#endif
5
54liuyao 已提交
368 369
}

370
// todo refactor
dengyihao's avatar
dengyihao 已提交
371
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
372
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
373
  return streamStateFillDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
374
#else
dengyihao's avatar
dengyihao 已提交
375
  return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
376
#endif
377 378
}

379
int32_t streamStateClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
380
#ifdef USE_ROCKSDB
5
54liuyao 已提交
381
  streamFileStateClear(pState->pFileState);
L
liuyao 已提交
382 383 384 385
  if (needClearDiskBuff(pState->pFileState)) {
    streamStateClear_rocksdb(pState);
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
386
#else
387 388 389 390
  SWinKey key = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &key, NULL, 0);
  while (1) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
dengyihao's avatar
dengyihao 已提交
391 392
    SWinKey delKey = {0};
    int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
5
54liuyao 已提交
393
    streamStateFreeCur(pCur);
394 395 396 397 398 399 400
    if (code == 0) {
      streamStateDel(pState, &delKey);
    } else {
      break;
    }
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
401
#endif
402 403 404 405
}

void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }

L
fix bug  
liuyao 已提交
406 407 408
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
#ifdef USE_ROCKSDB
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
409
  void*   batch = streamStateCreateBatch();
dengyihao's avatar
dengyihao 已提交
410

dengyihao's avatar
dengyihao 已提交
411
  code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0);
L
fix bug  
liuyao 已提交
412 413 414 415 416 417 418
  if (code != 0) {
    return code;
  }
  code = streamStatePutBatch_rocksdb(pState, batch);
  streamStateDestroyBatch(batch);
  return code;
#else
dengyihao's avatar
dengyihao 已提交
419
  return 0;
L
fix bug  
liuyao 已提交
420 421 422 423 424
#endif
}

int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen) {
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
425 426 427
  int32_t code = 0;
  code = streamDefaultGet_rocksdb(pState, pKey, pVal, pLen);
  return code;
L
fix bug  
liuyao 已提交
428
#else
dengyihao's avatar
dengyihao 已提交
429
  return 0;
L
fix bug  
liuyao 已提交
430 431 432
#endif
}

433
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
434
#ifdef USE_ROCKSDB
5
54liuyao 已提交
435
  return streamStateGet(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
436
#else
437 438 439 440 441
  // todo refactor
  int32_t size = *pVLen;
  if (streamStateGet(pState, key, pVal, pVLen) == 0) {
    return 0;
  }
5
54liuyao 已提交
442 443 444
  *pVal = tdbRealloc(NULL, size);
  memset(*pVal, 0, size);
  return 0;
dengyihao's avatar
dengyihao 已提交
445
#endif
446 447 448 449
}

int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
450
  qDebug("streamStateReleaseBuf");
5
54liuyao 已提交
451 452 453
  if (!pVal) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
454 455 456
#ifdef USE_ROCKSDB
  taosMemoryFree(pVal);
#else
457
  streamStateFreeVal(pVal);
dengyihao's avatar
dengyihao 已提交
458
#endif
459 460 461
  return 0;
}

5
54liuyao 已提交
462
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
463 464 465
#ifdef USE_ROCKSDB
  return streamStateFillGetCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
466 467
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
468
  tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
5
54liuyao 已提交
469

5
54liuyao 已提交
470
  int32_t c = 0;
5
54liuyao 已提交
471 472
  tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
  if (c != 0) {
5
54liuyao 已提交
473
    streamStateFreeCur(pCur);
5
54liuyao 已提交
474 475 476
    return NULL;
  }
  return pCur;
dengyihao's avatar
dengyihao 已提交
477
#endif
5
54liuyao 已提交
478 479 480
}

SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
481 482 483
#ifdef USE_ROCKSDB
  return streamStateGetAndCheckCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
484 485 486 487 488 489
  SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
    if (code == 0) {
      return pCur;
    }
5
54liuyao 已提交
490
    streamStateFreeCur(pCur);
5
54liuyao 已提交
491 492
  }
  return NULL;
dengyihao's avatar
dengyihao 已提交
493
#endif
5
54liuyao 已提交
494 495
}

496
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
497 498 499
#ifdef USE_ROCKSDB
  return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
500 501 502 503
  if (!pCur) {
    return -1;
  }
  const SStateKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
504
  int32_t kLen;
505 506 507 508 509 510 511 512
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
513
#endif
514 515 516
}

int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
517 518 519
#ifdef USE_ROCKSDB
  return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
520 521 522
  if (!pCur) {
    return -1;
  }
523
  const SWinKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
524
  int32_t kLen;
525 526 527 528 529
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  *pKey = *pKTmp;
  return 0;
dengyihao's avatar
dengyihao 已提交
530
#endif
531 532
}

5
54liuyao 已提交
533
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
534 535 536
#ifdef USE_ROCKSDB
  return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
537 538 539
  if (!pCur) {
    return -1;
  }
5
54liuyao 已提交
540
  uint64_t groupId = pKey->groupId;
dengyihao's avatar
dengyihao 已提交
541
  int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
5
54liuyao 已提交
542 543 544 545 546 547
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
  }
  return -1;
dengyihao's avatar
dengyihao 已提交
548
#endif
5
54liuyao 已提交
549 550
}

551
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
552 553 554
#ifdef USE_ROCKSDB
  return streamStateGetFirst_rocksdb(pState, key);
#else
555 556 557 558
  // todo refactor
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
dengyihao's avatar
dengyihao 已提交
559
  int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
M
Minglei Jin 已提交
560
  streamStateFreeCur(pCur);
561 562
  streamStateDel(pState, &tmp);
  return code;
dengyihao's avatar
dengyihao 已提交
563
#endif
564 565
}

566
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
567 568 569 570
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_first(pCur->iter);
  return 0;
#else
571
  return tdbTbcMoveToFirst(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
572
#endif
573 574 575
}

int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
576 577 578 579
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_last(pCur->iter);
  return 0;
#else
580
  return tdbTbcMoveToLast(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
581
#endif
582 583
}

584
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
585 586 587
#ifdef USE_ROCKSDB
  return streamStateSeekKeyNext_rocksdb(pState, key);
#else
588 589 590 591 592
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
593
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
594
    streamStateFreeCur(pCur);
595 596 597 598
    return NULL;
  }

  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
599
  int32_t c = 0;
600
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
5
54liuyao 已提交
601
    streamStateFreeCur(pCur);
602 603 604 605 606
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
607
    streamStateFreeCur(pCur);
608 609 610 611
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
612
#endif
613 614
}

5
54liuyao 已提交
615
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
616 617 618
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyNext_rocksdb(pState, key);
#else
619
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
5
54liuyao 已提交
620
  if (!pCur) {
621 622
    return NULL;
  }
5
54liuyao 已提交
623
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
624
    streamStateFreeCur(pCur);
5
54liuyao 已提交
625 626
    return NULL;
  }
627

5
54liuyao 已提交
628
  int32_t c = 0;
629
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
630
    streamStateFreeCur(pCur);
631 632 633 634 635
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
636
    streamStateFreeCur(pCur);
637 638 639 640
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
641
#endif
642 643
}

5
54liuyao 已提交
644
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
645 646 647
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyPrev_rocksdb(pState, key);
#else
648 649 650 651
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
5
54liuyao 已提交
652
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
653
    streamStateFreeCur(pCur);
5
54liuyao 已提交
654 655
    return NULL;
  }
656

5
54liuyao 已提交
657
  int32_t c = 0;
658
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
659
    streamStateFreeCur(pCur);
660 661 662 663 664
    return NULL;
  }
  if (c < 0) return pCur;

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
5
54liuyao 已提交
665
    streamStateFreeCur(pCur);
666 667 668 669
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
670
#endif
671 672 673
}

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
674 675 676
#ifdef USE_ROCKSDB
  return streamStateCurNext_rocksdb(pState, pCur);
#else
5
54liuyao 已提交
677 678 679
  if (!pCur) {
    return -1;
  }
680 681
  //
  return tdbTbcMoveToNext(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
682
#endif
683 684 685
}

int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
686 687 688
#ifdef USE_ROCKSDB
  return streamStateCurPrev_rocksdb(pState, pCur);
#else
689 690 691
  if (!pCur) {
    return -1;
  }
692
  return tdbTbcMoveToPrev(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
693
#endif
694
}
695
void streamStateFreeCur(SStreamStateCur* pCur) {
696 697 698
  if (!pCur) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
699
  qDebug("streamStateFreeCur");
dengyihao's avatar
dengyihao 已提交
700
  rocksdb_iter_destroy(pCur->iter);
dengyihao's avatar
dengyihao 已提交
701
  if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot);
dengyihao's avatar
dengyihao 已提交
702 703
  rocksdb_readoptions_destroy(pCur->readOpt);

dengyihao's avatar
dengyihao 已提交
704
  tdbTbcClose(pCur->pCur);
705 706 707
  taosMemoryFree(pCur);
}

708
void streamStateFreeVal(void* val) {
dengyihao's avatar
dengyihao 已提交
709 710 711 712 713 714
#ifdef USE_ROCKSDB
  taosMemoryFree(val);
#else
  tdbFree(val);
#endif
}
5
54liuyao 已提交
715 716

int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
717 718 719
#ifdef USE_ROCKSDB
  return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else
5
54liuyao 已提交
720
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
721
  return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
722
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
723
#endif
5
54liuyao 已提交
724 725 726
}

int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
727 728 729 730
#ifdef USE_ROCKSDB
  return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
#else

731
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
dengyihao's avatar
dengyihao 已提交
732 733 734
  SSessionKey resKey = *key;
  void* tmp = NULL;
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
735
  if (code == 0) {
5
54liuyao 已提交
736 737 738 739 740 741 742
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = tdbRealloc(NULL, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
    }
5
54liuyao 已提交
743 744
  }
  streamStateFreeCur(pCur);
745
  return code;
dengyihao's avatar
dengyihao 已提交
746
#endif
5
54liuyao 已提交
747 748 749
}

int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
750 751 752
#ifdef USE_ROCKSDB
  return streamStateSessionDel_rocksdb(pState, key);
#else
5
54liuyao 已提交
753
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
754
  return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
755
#endif
5
54liuyao 已提交
756 757
}

758
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
759 760 761
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
#else
5
54liuyao 已提交
762 763 764 765 766
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
767
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
768 769 770 771 772
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
773
  int32_t c = 0;
5
54liuyao 已提交
774 775 776 777
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
778
  if (c >= 0) return pCur;
5
54liuyao 已提交
779 780 781 782 783 784 785

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
786
#endif
5
54liuyao 已提交
787 788
}

789
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
790 791 792
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key);
#else
793 794 795 796 797
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
798
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
799 800 801 802 803
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
804
  int32_t c = 0;
805 806 807 808 809 810 811 812 813 814 815 816 817
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  if (c <= 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
818
#endif
819 820
}

5
54liuyao 已提交
821
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
822 823 824
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyNext_rocksdb(pState, key);
#else
5
54liuyao 已提交
825 826 827 828 829
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
830
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
831 832 833 834 835
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
836
  int32_t c = 0;
5
54liuyao 已提交
837 838 839 840
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
841
  if (c < 0) return pCur;
5
54liuyao 已提交
842 843 844 845 846 847 848

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
849
#endif
5
54liuyao 已提交
850 851
}

852
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
853 854 855
#ifdef USE_ROCKSDB
  return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
856 857 858
  if (!pCur) {
    return -1;
  }
859
  SStateSessionKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
860
  int32_t kLen;
861
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
5
54liuyao 已提交
862 863 864 865 866 867 868 869 870 871
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
872
#endif
5
54liuyao 已提交
873 874 875
}

int32_t streamStateSessionClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
876 877 878 879
#ifdef USE_ROCKSDB
  return streamStateSessionClear_rocksdb(pState);
#else
  SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
880
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
5
54liuyao 已提交
881 882
  while (1) {
    SSessionKey delKey = {0};
dengyihao's avatar
dengyihao 已提交
883 884 885
    void* buf = NULL;
    int32_t size = 0;
    int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
5
54liuyao 已提交
886
    if (code == 0 && size > 0) {
5
54liuyao 已提交
887 888 889 890 891 892 893 894 895
      memset(buf, 0, size);
      streamStateSessionPut(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return 0;
dengyihao's avatar
dengyihao 已提交
896
#endif
5
54liuyao 已提交
897 898
}

899
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
dengyihao's avatar
dengyihao 已提交
900 901 902
#ifdef USE_ROCKSDB
  return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey);
#else
903 904 905 906 907
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
908
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
909 910
    streamStateFreeCur(pCur);
    return -1;
5
54liuyao 已提交
911 912
  }

913
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
914
  int32_t c = 0;
915 916 917 918 919 920
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return -1;
  }

  SSessionKey resKey = *key;
dengyihao's avatar
dengyihao 已提交
921
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
943 944
    }
  }
945

946
  streamStateFreeCur(pCur);
947
  return -1;
dengyihao's avatar
dengyihao 已提交
948
#endif
949 950
}

951 952
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                        int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
953 954 955
#ifdef USE_ROCKSDB
  return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen);
#else
5
54liuyao 已提交
956
  // todo refactor
dengyihao's avatar
dengyihao 已提交
957
  int32_t res = 0;
958 959 960 961 962
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;
dengyihao's avatar
dengyihao 已提交
963
  void* tmp = tdbRealloc(NULL, valSize);
964 965 966 967 968
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
969
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
970 971 972 973 974 975 976 977 978
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
    streamStateCurNext(pState, pCur);
  } else {
    *key = originKey;
5
54liuyao 已提交
979
    streamStateFreeCur(pCur);
980
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
981
  }
982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998

  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
5
54liuyao 已提交
999
  streamStateFreeCur(pCur);
1000
  return res;
dengyihao's avatar
dengyihao 已提交
1001 1002

#endif
5
54liuyao 已提交
1003 1004 1005 1006 1007
}

int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
1008 1009 1010 1011 1012

#ifdef USE_ROCKSDB
  return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else
  int32_t res = 0;
5
54liuyao 已提交
1013
  SSessionKey tmpKey = *key;
dengyihao's avatar
dengyihao 已提交
1014 1015
  int32_t valSize = *pVLen;
  void* tmp = tdbRealloc(NULL, valSize);
5
54liuyao 已提交
1016 1017 1018 1019
  if (!tmp) {
    return -1;
  }

1020
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
1021
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1022 1023 1024
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
1025
      streamStateSessionDel(pState, key);
1026 1027
      goto _end;
    }
5
54liuyao 已提交
1028 1029 1030 1031

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1032
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1033 1034
      goto _end;
    }
5
54liuyao 已提交
1035 1036 1037 1038 1039 1040

    streamStateCurNext(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
1041 1042
  }

1043
  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1044
  if (code == 0) {
5
54liuyao 已提交
1045 1046 1047
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1048
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
dengyihao's avatar
dengyihao 已提交
1062
#endif
5
54liuyao 已提交
1063
}
5
54liuyao 已提交
1064

1065
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
dengyihao's avatar
dengyihao 已提交
1066
  qWarn("try to write to cf parname");
dengyihao's avatar
dengyihao 已提交
1067
#ifdef USE_ROCKSDB
L
liuyao 已提交
1068 1069 1070 1071 1072 1073 1074 1075
  if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
    if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
      streamStatePutParName_rocksdb(pState, groupId, tbname);
    }
    return TSDB_CODE_SUCCESS;
  }
  tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1076
#else
L
Liu Jicong 已提交
1077 1078
  return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
1079
#endif
1080 1081 1082
}

int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
dengyihao's avatar
dengyihao 已提交
1083
#ifdef USE_ROCKSDB
L
liuyao 已提交
1084 1085 1086 1087 1088 1089 1090 1091 1092 1093
  void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t));
  if (!pStr) {
    if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
      return streamStateGetParName_rocksdb(pState, groupId, pVal);
    }
    return TSDB_CODE_FAILED;
  }
  *pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
  memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN);
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1094
#else
1095
  int32_t len;
5
54liuyao 已提交
1096
  return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
dengyihao's avatar
dengyihao 已提交
1097
#endif
5
54liuyao 已提交
1098 1099
}

dengyihao's avatar
dengyihao 已提交
1100
void streamStateDestroy(SStreamState* pState, bool remove) {
dengyihao's avatar
dengyihao 已提交
1101
#ifdef USE_ROCKSDB
5
54liuyao 已提交
1102
  streamFileStateDestroy(pState->pFileState);
dengyihao's avatar
dengyihao 已提交
1103
  streamStateDestroy_rocksdb(pState, remove);
L
fix bug  
liuyao 已提交
1104
  tSimpleHashCleanup(pState->parNameMap);
dengyihao's avatar
dengyihao 已提交
1105 1106
  // do nothong
#endif
5
54liuyao 已提交
1107 1108
  taosMemoryFreeClear(pState->pTdbState);
  taosMemoryFreeClear(pState);
1109 1110
}

L
liuyao 已提交
1111 1112 1113 1114 1115 1116 1117 1118
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
#ifdef USE_ROCKSDB
  return deleteExpiredCheckPoint(pState->pFileState, mark);
#else
  return 0;
#endif
}

5
54liuyao 已提交
1119 1120 1121 1122 1123 1124 1125
#if 0
char* streamStateSessionDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
1126
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
1127 1128 1129 1130 1131 1132
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SSessionKey key = {0};
1133 1134 1135
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
5
54liuyao 已提交
1136
  if (code != 0) {
1137
    streamStateFreeCur(pCur);
5
54liuyao 已提交
1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
  len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SSessionKey){0};
    code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
1152
      streamStateFreeCur(pCur);
5
54liuyao 已提交
1153 1154 1155 1156 1157 1158
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
    len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
1159
  streamStateFreeCur(pCur);
5
54liuyao 已提交
1160 1161
  return dumpBuf;
}
5
54liuyao 已提交
1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204

char* streamStateIntervalDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SWinKey key = {0};
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateGetKVByCur(pCur, &key, (const void **)&buf, &bufSize);
  if (code != 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
  // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SWinKey){0};
    code = streamStateGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
      streamStateFreeCur(pCur);
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
    // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
  streamStateFreeCur(pCur);
  return dumpBuf;
}
5
54liuyao 已提交
1205
#endif