streamState.c 33.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "streamState.h"
17
#include "executor.h"
dengyihao's avatar
dengyihao 已提交
18
#include "osMemory.h"
dengyihao's avatar
dengyihao 已提交
19
#include "rocksdb/c.h"
dengyihao's avatar
dengyihao 已提交
20
#include "streamBackendRocksdb.h"
21
#include "streamInc.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tcoding.h"
23
#include "tcommon.h"
24
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
25
#include "tref.h"
26

L
liuyao 已提交
27
#define MAX_TABLE_NAME_NUM 200000
L
liuyao 已提交
28

dengyihao's avatar
dengyihao 已提交
29
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
5
54liuyao 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
45
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  if (pWin1->win.ekey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
67
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
5
54liuyao 已提交
68 69 70 71 72 73 74 75 76
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

77
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
5
54liuyao 已提交
78 79
}

dengyihao's avatar
dengyihao 已提交
80
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
81 82 83 84 85 86 87 88 89
  SStateKey* pWin1 = (SStateKey*)pKey1;
  SStateKey* pWin2 = (SStateKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

L
liuyao 已提交
90
  return winKeyCmprImpl(&pWin1->key, &pWin2->key);
91 92
}

93 94
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) {
  qDebug("open stream state, %s", path);
95 96 97 98 99
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
  if (pState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
100

5
54liuyao 已提交
101 102 103
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
  if (pState->pTdbState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
104
    streamStateDestroy(pState, true);
5
54liuyao 已提交
105 106
    return NULL;
  }
L
Liu Jicong 已提交
107

108
  SStreamTask* pStreamTask = pTask;
109
  char statePath[1024];
L
Liu Jicong 已提交
110
  if (!specPath) {
111
    sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId);
L
Liu Jicong 已提交
112
  } else {
113 114
    memset(statePath, 0, 1024);
    tstrncpy(statePath, path, 1024);
L
Liu Jicong 已提交
115
  }
116 117 118 119

  pState->taskId = pStreamTask->id.taskId;
  pState->streamId = pStreamTask->id.streamId;

dengyihao's avatar
dengyihao 已提交
120
#ifdef USE_ROCKSDB
121 122 123
  SStreamMeta* pMeta = pStreamTask->pMeta;
  taosAcquireRef(pMeta->streamBackendId, pMeta->streamBackendRid);
  int code = streamStateOpenBackend(pMeta->streamBackend, pState);
dengyihao's avatar
dengyihao 已提交
124
  if (code == -1) {
125
    taosReleaseRef(pMeta->streamBackendId, pMeta->streamBackendRid);
dengyihao's avatar
dengyihao 已提交
126 127 128
    taosMemoryFree(pState);
    pState = NULL;
  }
129

dengyihao's avatar
dengyihao 已提交
130
  pState->pTdbState->pOwner = pTask;
5
54liuyao 已提交
131
  pState->pFileState = NULL;
L
liuyao 已提交
132
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
dengyihao's avatar
dengyihao 已提交
133

134
  pState->parNameMap = tSimpleHashInit(1024, hashFn);
dengyihao's avatar
dengyihao 已提交
135 136 137
  return pState;

#else
L
add cfg  
Liu Jicong 已提交
138

L
Liu Jicong 已提交
139
  char cfgPath[1030];
L
add cfg  
Liu Jicong 已提交
140 141
  sprintf(cfgPath, "%s/cfg", statePath);

D
dapan1121 已提交
142 143
  szPage = szPage < 0 ? 4096 : szPage;
  pages = pages < 0 ? 256 : pages;
L
add cfg  
Liu Jicong 已提交
144 145 146 147
  char cfg[1024];
  memset(cfg, 0, 1024);
  TdFilePtr pCfgFile = taosOpenFile(cfgPath, TD_FILE_READ);
  if (pCfgFile != NULL) {
D
dapan1121 已提交
148
    int64_t size = 0;
L
add cfg  
Liu Jicong 已提交
149
    taosFStatFile(pCfgFile, &size, NULL);
D
dapan1121 已提交
150 151 152 153
    if (size > 0) {
      taosReadFile(pCfgFile, cfg, size);
      sscanf(cfg, "%d\n%d\n", &szPage, &pages);
    }
L
add cfg  
Liu Jicong 已提交
154
  } else {
D
dapan1121 已提交
155 156 157 158 159 160
    int32_t code = taosMulModeMkDir(statePath, 0755);
    if (code == 0) {
      pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE);
      sprintf(cfg, "%d\n%d\n", szPage, pages);
      taosWriteFile(pCfgFile, cfg, strlen(cfg));
    }
L
add cfg  
Liu Jicong 已提交
161 162 163
  }
  taosCloseFile(&pCfgFile);

164
  if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) {
165 166 167 168
    goto _err;
  }

  // open state storage backend
5
54liuyao 已提交
169 170
  if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
                0) < 0) {
171 172 173
    goto _err;
  }

5
54liuyao 已提交
174
  // todo refactor
5
54liuyao 已提交
175 176
  if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFillStateDb, 0) < 0) {
5
54liuyao 已提交
177 178 179
    goto _err;
  }

5
54liuyao 已提交
180 181
  if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pSessionStateDb, 0) < 0) {
5
54liuyao 已提交
182 183 184
    goto _err;
  }

5
54liuyao 已提交
185 186
  if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFuncStateDb, 0) < 0) {
187 188 189
    goto _err;
  }

5
54liuyao 已提交
190 191
  if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
                &pState->pTdbState->pParNameDb, 0) < 0) {
192 193 194
    goto _err;
  }

L
Liu Jicong 已提交
195 196 197 198 199
  if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) <
      0) {
    goto _err;
  }

200
  if (streamStateBegin(pState) < 0) {
201 202 203
    goto _err;
  }

5
54liuyao 已提交
204
  pState->pTdbState->pOwner = pTask;
L
liuyao 已提交
205
  pState->checkPointId = 0;
206 207 208 209

  return pState;

_err:
5
54liuyao 已提交
210 211 212 213 214
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
215
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
216
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
217
  streamStateDestroy(pState, false);
218
  return NULL;
dengyihao's avatar
dengyihao 已提交
219
#endif
220 221
}

dengyihao's avatar
dengyihao 已提交
222
void streamStateClose(SStreamState* pState, bool remove) {
dengyihao's avatar
dengyihao 已提交
223
  SStreamTask* pTask = pState->pTdbState->pOwner;
dengyihao's avatar
dengyihao 已提交
224
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
225 226
  // streamStateCloseBackend(pState);
  streamStateDestroy(pState, remove);
dengyihao's avatar
dengyihao 已提交
227
  taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
dengyihao's avatar
dengyihao 已提交
228
#else
229 230
  tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
  tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
5
54liuyao 已提交
231 232 233 234 235
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
236
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
237
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
238
#endif
239 240 241
}

int32_t streamStateBegin(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
242 243 244
#ifdef USE_ROCKSDB
  return 0;
#else
245
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
246
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
247
    tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
248 249 250
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
251
#endif
252 253 254
}

int32_t streamStateCommit(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
255
#ifdef USE_ROCKSDB
L
liuyao 已提交
256 257 258 259
  if (pState->pFileState) {
    SStreamSnapshot* pShot = getSnapshot(pState->pFileState);
    flushSnapshot(pState->pFileState, pShot, true);
  }
5
54liuyao 已提交
260
  pState->checkPointId++;
dengyihao's avatar
dengyihao 已提交
261 262
  return 0;
#else
263
  if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
M
Minglei Jin 已提交
264 265
    return -1;
  }
266
  if (tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
267 268
    return -1;
  }
269

270
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
271
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
272 273
    return -1;
  }
L
liuyao 已提交
274
  pState->checkPointId++;
275
  return 0;
dengyihao's avatar
dengyihao 已提交
276
#endif
277 278
}

L
liuyao 已提交
279
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
280
#ifdef USE_ROCKSDB
L
liuyao 已提交
281 282 283 284 285 286 287
  void* pVal = NULL;
  int32_t len = 0;
  int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
  char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
  uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
  memcpy(buf + len - rowSize, value, vLen);
  return code;
dengyihao's avatar
dengyihao 已提交
288
#else
289
  return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
290
#endif
291
}
L
liuyao 已提交
292
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
293
#ifdef USE_ROCKSDB
L
liuyao 已提交
294 295 296 297 298 299 300
  void* pVal = NULL;
  int32_t len = 0;
  int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
  char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
  uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
  *ppVal = buf + len - rowSize;
  return code;
dengyihao's avatar
dengyihao 已提交
301
#else
L
liuyao 已提交
302
  return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen);
dengyihao's avatar
dengyihao 已提交
303
#endif
304 305
}

306
// todo refactor
307
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
308
#ifdef USE_ROCKSDB
5
54liuyao 已提交
309 310
  return 0;
  // return streamStatePut_rocksdb(pState, key, value, vLen);
dengyihao's avatar
dengyihao 已提交
311
#else
312
  SStateKey sKey = {.key = *key, .opNum = pState->number};
313
  return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
314
#endif
315
}
5
54liuyao 已提交
316

dengyihao's avatar
dengyihao 已提交
317
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
318
#ifdef USE_ROCKSDB
5
54liuyao 已提交
319
  return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
320
#else
dengyihao's avatar
dengyihao 已提交
321 322
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
323
#endif
5
54liuyao 已提交
324
}
5
54liuyao 已提交
325 326

bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
327
#ifdef USE_ROCKSDB
5
54liuyao 已提交
328 329 330 331 332 333 334 335
  return hasRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey));
#else
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
#endif
}

int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
L
liuyao 已提交
336 337 338
  int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
  releaseRowBuffPos(pos);
  return code;
5
54liuyao 已提交
339 340
}

341
// todo refactor
dengyihao's avatar
dengyihao 已提交
342
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
343
#ifdef USE_ROCKSDB
5
54liuyao 已提交
344
  return deleteRowBuff(pState->pFileState, key, sizeof(SWinKey));
dengyihao's avatar
dengyihao 已提交
345
#else
346
  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
347 348 349 350 351 352 353 354 355 356
  return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
#endif
}

// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
  return streamStateFillPut_rocksdb(pState, key, value, vLen);
#else
  return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
357
#endif
358 359
}

5
54liuyao 已提交
360 361
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
362 363 364
#ifdef USE_ROCKSDB
  return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
#else
5
54liuyao 已提交
365
  return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
366
#endif
5
54liuyao 已提交
367 368
}

369
// todo refactor
dengyihao's avatar
dengyihao 已提交
370
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
371
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
372
  return streamStateFillDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
373
#else
dengyihao's avatar
dengyihao 已提交
374
  return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
375
#endif
376 377
}

378
int32_t streamStateClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
379
#ifdef USE_ROCKSDB
5
54liuyao 已提交
380
  streamFileStateClear(pState->pFileState);
L
liuyao 已提交
381 382 383 384
  if (needClearDiskBuff(pState->pFileState)) {
    streamStateClear_rocksdb(pState);
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
385
#else
386 387 388 389
  SWinKey key = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &key, NULL, 0);
  while (1) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
dengyihao's avatar
dengyihao 已提交
390 391
    SWinKey delKey = {0};
    int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
5
54liuyao 已提交
392
    streamStateFreeCur(pCur);
393 394 395 396 397 398 399
    if (code == 0) {
      streamStateDel(pState, &delKey);
    } else {
      break;
    }
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
400
#endif
401 402 403 404
}

void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }

L
fix bug  
liuyao 已提交
405 406 407
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
#ifdef USE_ROCKSDB
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
408
  void*   batch = streamStateCreateBatch();
dengyihao's avatar
dengyihao 已提交
409

dengyihao's avatar
dengyihao 已提交
410
  code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0);
L
fix bug  
liuyao 已提交
411 412 413 414 415 416 417
  if (code != 0) {
    return code;
  }
  code = streamStatePutBatch_rocksdb(pState, batch);
  streamStateDestroyBatch(batch);
  return code;
#else
dengyihao's avatar
dengyihao 已提交
418
  return 0;
L
fix bug  
liuyao 已提交
419 420 421 422 423
#endif
}

int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen) {
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
424 425 426
  int32_t code = 0;
  code = streamDefaultGet_rocksdb(pState, pKey, pVal, pLen);
  return code;
L
fix bug  
liuyao 已提交
427
#else
dengyihao's avatar
dengyihao 已提交
428
  return 0;
L
fix bug  
liuyao 已提交
429 430 431
#endif
}

432
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
433
#ifdef USE_ROCKSDB
5
54liuyao 已提交
434
  return streamStateGet(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
435
#else
436 437 438 439 440
  // todo refactor
  int32_t size = *pVLen;
  if (streamStateGet(pState, key, pVal, pVLen) == 0) {
    return 0;
  }
5
54liuyao 已提交
441 442 443
  *pVal = tdbRealloc(NULL, size);
  memset(*pVal, 0, size);
  return 0;
dengyihao's avatar
dengyihao 已提交
444
#endif
445 446 447 448
}

int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
449
  qDebug("streamStateReleaseBuf");
5
54liuyao 已提交
450 451 452
  if (!pVal) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
453 454 455
#ifdef USE_ROCKSDB
  taosMemoryFree(pVal);
#else
456
  streamStateFreeVal(pVal);
dengyihao's avatar
dengyihao 已提交
457
#endif
458 459 460
  return 0;
}

5
54liuyao 已提交
461
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
462 463 464
#ifdef USE_ROCKSDB
  return streamStateFillGetCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
465 466
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
467
  tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
5
54liuyao 已提交
468

5
54liuyao 已提交
469
  int32_t c = 0;
5
54liuyao 已提交
470 471
  tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
  if (c != 0) {
5
54liuyao 已提交
472
    streamStateFreeCur(pCur);
5
54liuyao 已提交
473 474 475
    return NULL;
  }
  return pCur;
dengyihao's avatar
dengyihao 已提交
476
#endif
5
54liuyao 已提交
477 478 479
}

SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
480 481 482
#ifdef USE_ROCKSDB
  return streamStateGetAndCheckCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
483 484 485 486 487 488
  SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
    if (code == 0) {
      return pCur;
    }
5
54liuyao 已提交
489
    streamStateFreeCur(pCur);
5
54liuyao 已提交
490 491
  }
  return NULL;
dengyihao's avatar
dengyihao 已提交
492
#endif
5
54liuyao 已提交
493 494
}

495
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
496 497 498
#ifdef USE_ROCKSDB
  return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
499 500 501 502
  if (!pCur) {
    return -1;
  }
  const SStateKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
503
  int32_t kLen;
504 505 506 507 508 509 510 511
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
512
#endif
513 514 515
}

int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
516 517 518
#ifdef USE_ROCKSDB
  return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
519 520 521
  if (!pCur) {
    return -1;
  }
522
  const SWinKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
523
  int32_t kLen;
524 525 526 527 528
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  *pKey = *pKTmp;
  return 0;
dengyihao's avatar
dengyihao 已提交
529
#endif
530 531
}

5
54liuyao 已提交
532
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
533 534 535
#ifdef USE_ROCKSDB
  return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
536 537 538
  if (!pCur) {
    return -1;
  }
5
54liuyao 已提交
539
  uint64_t groupId = pKey->groupId;
dengyihao's avatar
dengyihao 已提交
540
  int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
5
54liuyao 已提交
541 542 543 544 545 546
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
  }
  return -1;
dengyihao's avatar
dengyihao 已提交
547
#endif
5
54liuyao 已提交
548 549
}

550
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
551 552 553
#ifdef USE_ROCKSDB
  return streamStateGetFirst_rocksdb(pState, key);
#else
554 555 556 557
  // todo refactor
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
dengyihao's avatar
dengyihao 已提交
558
  int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
M
Minglei Jin 已提交
559
  streamStateFreeCur(pCur);
560 561
  streamStateDel(pState, &tmp);
  return code;
dengyihao's avatar
dengyihao 已提交
562
#endif
563 564
}

565
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
566 567 568 569
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_first(pCur->iter);
  return 0;
#else
570
  return tdbTbcMoveToFirst(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
571
#endif
572 573 574
}

int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
575 576 577 578
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_last(pCur->iter);
  return 0;
#else
579
  return tdbTbcMoveToLast(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
580
#endif
581 582
}

583
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
584 585 586
#ifdef USE_ROCKSDB
  return streamStateSeekKeyNext_rocksdb(pState, key);
#else
587 588 589 590 591
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
592
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
593
    streamStateFreeCur(pCur);
594 595 596 597
    return NULL;
  }

  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
598
  int32_t c = 0;
599
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
5
54liuyao 已提交
600
    streamStateFreeCur(pCur);
601 602 603 604 605
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
606
    streamStateFreeCur(pCur);
607 608 609 610
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
611
#endif
612 613
}

5
54liuyao 已提交
614
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
615 616 617
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyNext_rocksdb(pState, key);
#else
618
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
5
54liuyao 已提交
619
  if (!pCur) {
620 621
    return NULL;
  }
5
54liuyao 已提交
622
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
623
    streamStateFreeCur(pCur);
5
54liuyao 已提交
624 625
    return NULL;
  }
626

5
54liuyao 已提交
627
  int32_t c = 0;
628
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
629
    streamStateFreeCur(pCur);
630 631 632 633 634
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
635
    streamStateFreeCur(pCur);
636 637 638 639
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
640
#endif
641 642
}

5
54liuyao 已提交
643
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
644 645 646
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyPrev_rocksdb(pState, key);
#else
647 648 649 650
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
5
54liuyao 已提交
651
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
652
    streamStateFreeCur(pCur);
5
54liuyao 已提交
653 654
    return NULL;
  }
655

5
54liuyao 已提交
656
  int32_t c = 0;
657
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
658
    streamStateFreeCur(pCur);
659 660 661 662 663
    return NULL;
  }
  if (c < 0) return pCur;

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
5
54liuyao 已提交
664
    streamStateFreeCur(pCur);
665 666 667 668
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
669
#endif
670 671 672
}

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
673 674 675
#ifdef USE_ROCKSDB
  return streamStateCurNext_rocksdb(pState, pCur);
#else
5
54liuyao 已提交
676 677 678
  if (!pCur) {
    return -1;
  }
679 680
  //
  return tdbTbcMoveToNext(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
681
#endif
682 683 684
}

int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
685 686 687
#ifdef USE_ROCKSDB
  return streamStateCurPrev_rocksdb(pState, pCur);
#else
688 689 690
  if (!pCur) {
    return -1;
  }
691
  return tdbTbcMoveToPrev(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
692
#endif
693
}
694
void streamStateFreeCur(SStreamStateCur* pCur) {
695 696 697
  if (!pCur) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
698
  qDebug("streamStateFreeCur");
dengyihao's avatar
dengyihao 已提交
699
  rocksdb_iter_destroy(pCur->iter);
dengyihao's avatar
dengyihao 已提交
700
  if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot);
dengyihao's avatar
dengyihao 已提交
701 702
  rocksdb_readoptions_destroy(pCur->readOpt);

dengyihao's avatar
dengyihao 已提交
703
  tdbTbcClose(pCur->pCur);
704 705 706
  taosMemoryFree(pCur);
}

707
void streamStateFreeVal(void* val) {
dengyihao's avatar
dengyihao 已提交
708 709 710 711 712 713
#ifdef USE_ROCKSDB
  taosMemoryFree(val);
#else
  tdbFree(val);
#endif
}
5
54liuyao 已提交
714 715

int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
716 717 718
#ifdef USE_ROCKSDB
  return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else
5
54liuyao 已提交
719
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
720
  return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
721
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
722
#endif
5
54liuyao 已提交
723 724 725
}

int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
726 727 728 729
#ifdef USE_ROCKSDB
  return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
#else

730
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
dengyihao's avatar
dengyihao 已提交
731 732 733
  SSessionKey resKey = *key;
  void* tmp = NULL;
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
734
  if (code == 0) {
5
54liuyao 已提交
735 736 737 738 739 740 741
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = tdbRealloc(NULL, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
    }
5
54liuyao 已提交
742 743
  }
  streamStateFreeCur(pCur);
744
  return code;
dengyihao's avatar
dengyihao 已提交
745
#endif
5
54liuyao 已提交
746 747 748
}

int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
749 750 751
#ifdef USE_ROCKSDB
  return streamStateSessionDel_rocksdb(pState, key);
#else
5
54liuyao 已提交
752
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
753
  return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
754
#endif
5
54liuyao 已提交
755 756
}

757
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
758 759 760
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
#else
5
54liuyao 已提交
761 762 763 764 765
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
766
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
767 768 769 770 771
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
772
  int32_t c = 0;
5
54liuyao 已提交
773 774 775 776
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
777
  if (c >= 0) return pCur;
5
54liuyao 已提交
778 779 780 781 782 783 784

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
785
#endif
5
54liuyao 已提交
786 787
}

788
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
789 790 791
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key);
#else
792 793 794 795 796
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
797
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
798 799 800 801 802
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
803
  int32_t c = 0;
804 805 806 807 808 809 810 811 812 813 814 815 816
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  if (c <= 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
817
#endif
818 819
}

5
54liuyao 已提交
820
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
821 822 823
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyNext_rocksdb(pState, key);
#else
5
54liuyao 已提交
824 825 826 827 828
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
829
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
830 831 832 833 834
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
835
  int32_t c = 0;
5
54liuyao 已提交
836 837 838 839
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
840
  if (c < 0) return pCur;
5
54liuyao 已提交
841 842 843 844 845 846 847

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
848
#endif
5
54liuyao 已提交
849 850
}

851
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
852 853 854
#ifdef USE_ROCKSDB
  return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
855 856 857
  if (!pCur) {
    return -1;
  }
858
  SStateSessionKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
859
  int32_t kLen;
860
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
5
54liuyao 已提交
861 862 863 864 865 866 867 868 869 870
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
871
#endif
5
54liuyao 已提交
872 873 874
}

int32_t streamStateSessionClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
875 876 877 878
#ifdef USE_ROCKSDB
  return streamStateSessionClear_rocksdb(pState);
#else
  SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
879
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
5
54liuyao 已提交
880 881
  while (1) {
    SSessionKey delKey = {0};
dengyihao's avatar
dengyihao 已提交
882 883 884
    void* buf = NULL;
    int32_t size = 0;
    int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
5
54liuyao 已提交
885
    if (code == 0 && size > 0) {
5
54liuyao 已提交
886 887 888 889 890 891 892 893 894
      memset(buf, 0, size);
      streamStateSessionPut(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return 0;
dengyihao's avatar
dengyihao 已提交
895
#endif
5
54liuyao 已提交
896 897
}

898
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
dengyihao's avatar
dengyihao 已提交
899 900 901
#ifdef USE_ROCKSDB
  return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey);
#else
902 903 904 905 906
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
907
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
908 909
    streamStateFreeCur(pCur);
    return -1;
5
54liuyao 已提交
910 911
  }

912
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
913
  int32_t c = 0;
914 915 916 917 918 919
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return -1;
  }

  SSessionKey resKey = *key;
dengyihao's avatar
dengyihao 已提交
920
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
942 943
    }
  }
944

945
  streamStateFreeCur(pCur);
946
  return -1;
dengyihao's avatar
dengyihao 已提交
947
#endif
948 949
}

950 951
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                        int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
952 953 954
#ifdef USE_ROCKSDB
  return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen);
#else
5
54liuyao 已提交
955
  // todo refactor
dengyihao's avatar
dengyihao 已提交
956
  int32_t res = 0;
957 958 959 960 961
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;
dengyihao's avatar
dengyihao 已提交
962
  void* tmp = tdbRealloc(NULL, valSize);
963 964 965 966 967
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
968
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
969 970 971 972 973 974 975 976 977
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
    streamStateCurNext(pState, pCur);
  } else {
    *key = originKey;
5
54liuyao 已提交
978
    streamStateFreeCur(pCur);
979
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
980
  }
981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997

  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
5
54liuyao 已提交
998
  streamStateFreeCur(pCur);
999
  return res;
dengyihao's avatar
dengyihao 已提交
1000 1001

#endif
5
54liuyao 已提交
1002 1003 1004 1005 1006
}

int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
1007 1008 1009 1010 1011

#ifdef USE_ROCKSDB
  return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else
  int32_t res = 0;
5
54liuyao 已提交
1012
  SSessionKey tmpKey = *key;
dengyihao's avatar
dengyihao 已提交
1013 1014
  int32_t valSize = *pVLen;
  void* tmp = tdbRealloc(NULL, valSize);
5
54liuyao 已提交
1015 1016 1017 1018
  if (!tmp) {
    return -1;
  }

1019
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
1020
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1021 1022 1023
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
1024
      streamStateSessionDel(pState, key);
1025 1026
      goto _end;
    }
5
54liuyao 已提交
1027 1028 1029 1030

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1031
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1032 1033
      goto _end;
    }
5
54liuyao 已提交
1034 1035 1036 1037 1038 1039

    streamStateCurNext(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
1040 1041
  }

1042
  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1043
  if (code == 0) {
5
54liuyao 已提交
1044 1045 1046
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1047
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
dengyihao's avatar
dengyihao 已提交
1061
#endif
5
54liuyao 已提交
1062
}
5
54liuyao 已提交
1063

1064
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
L
liuyao 已提交
1065
  qDebug("try to write to cf parname");
dengyihao's avatar
dengyihao 已提交
1066
#ifdef USE_ROCKSDB
L
liuyao 已提交
1067 1068 1069 1070 1071 1072 1073 1074
  if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
    if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
      streamStatePutParName_rocksdb(pState, groupId, tbname);
    }
    return TSDB_CODE_SUCCESS;
  }
  tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1075
#else
L
Liu Jicong 已提交
1076 1077
  return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
1078
#endif
1079 1080 1081
}

int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
dengyihao's avatar
dengyihao 已提交
1082
#ifdef USE_ROCKSDB
L
liuyao 已提交
1083 1084 1085 1086 1087 1088 1089 1090 1091 1092
  void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t));
  if (!pStr) {
    if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
      return streamStateGetParName_rocksdb(pState, groupId, pVal);
    }
    return TSDB_CODE_FAILED;
  }
  *pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
  memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN);
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1093
#else
1094
  int32_t len;
5
54liuyao 已提交
1095
  return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
dengyihao's avatar
dengyihao 已提交
1096
#endif
5
54liuyao 已提交
1097 1098
}

dengyihao's avatar
dengyihao 已提交
1099
void streamStateDestroy(SStreamState* pState, bool remove) {
dengyihao's avatar
dengyihao 已提交
1100
#ifdef USE_ROCKSDB
5
54liuyao 已提交
1101
  streamFileStateDestroy(pState->pFileState);
dengyihao's avatar
dengyihao 已提交
1102
  streamStateDestroy_rocksdb(pState, remove);
L
fix bug  
liuyao 已提交
1103
  tSimpleHashCleanup(pState->parNameMap);
dengyihao's avatar
dengyihao 已提交
1104 1105
  // do nothong
#endif
5
54liuyao 已提交
1106 1107
  taosMemoryFreeClear(pState->pTdbState);
  taosMemoryFreeClear(pState);
1108 1109
}

L
liuyao 已提交
1110 1111 1112 1113 1114 1115 1116 1117
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
#ifdef USE_ROCKSDB
  return deleteExpiredCheckPoint(pState->pFileState, mark);
#else
  return 0;
#endif
}

5
54liuyao 已提交
1118 1119 1120 1121 1122 1123 1124
#if 0
char* streamStateSessionDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
1125
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
1126 1127 1128 1129 1130 1131
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SSessionKey key = {0};
1132 1133 1134
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
5
54liuyao 已提交
1135
  if (code != 0) {
1136
    streamStateFreeCur(pCur);
5
54liuyao 已提交
1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
  len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SSessionKey){0};
    code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
1151
      streamStateFreeCur(pCur);
5
54liuyao 已提交
1152 1153 1154 1155 1156 1157
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
    len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
1158
  streamStateFreeCur(pCur);
5
54liuyao 已提交
1159 1160
  return dumpBuf;
}
5
54liuyao 已提交
1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203

char* streamStateIntervalDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SWinKey key = {0};
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateGetKVByCur(pCur, &key, (const void **)&buf, &bufSize);
  if (code != 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
  // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SWinKey){0};
    code = streamStateGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
      streamStateFreeCur(pCur);
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
    // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
  streamStateFreeCur(pCur);
  return dumpBuf;
}
5
54liuyao 已提交
1204
#endif