streamState.c 33.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "streamState.h"
17
#include "executor.h"
dengyihao's avatar
dengyihao 已提交
18
#include "osMemory.h"
dengyihao's avatar
dengyihao 已提交
19
#include "rocksdb/c.h"
dengyihao's avatar
dengyihao 已提交
20
#include "streamBackendRocksdb.h"
21
#include "streamInc.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tcoding.h"
23
#include "tcommon.h"
24
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
25
#include "tref.h"
26

L
liuyao 已提交
27
#define MAX_TABLE_NAME_NUM 200000
L
liuyao 已提交
28

dengyihao's avatar
dengyihao 已提交
29
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
5
54liuyao 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
45
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  if (pWin1->win.ekey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
67
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
5
54liuyao 已提交
68 69 70 71 72 73 74 75 76
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

77
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
5
54liuyao 已提交
78 79
}

dengyihao's avatar
dengyihao 已提交
80
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
81 82 83 84 85 86 87 88 89
  SStateKey* pWin1 = (SStateKey*)pKey1;
  SStateKey* pWin2 = (SStateKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

L
liuyao 已提交
90
  return winKeyCmprImpl(&pWin1->key, &pWin2->key);
91 92
}

93 94
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) {
  qDebug("open stream state, %s", path);
95 96 97 98 99
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
  if (pState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
100

5
54liuyao 已提交
101 102 103
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
  if (pState->pTdbState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
104
    streamStateDestroy(pState, true);
5
54liuyao 已提交
105 106
    return NULL;
  }
L
Liu Jicong 已提交
107

108
  SStreamTask* pStreamTask = pTask;
dengyihao's avatar
dengyihao 已提交
109
  char         statePath[1024];
L
Liu Jicong 已提交
110
  if (!specPath) {
111
    sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId);
L
Liu Jicong 已提交
112
  } else {
113 114
    memset(statePath, 0, 1024);
    tstrncpy(statePath, path, 1024);
L
Liu Jicong 已提交
115
  }
116 117 118 119

  pState->taskId = pStreamTask->id.taskId;
  pState->streamId = pStreamTask->id.streamId;

dengyihao's avatar
dengyihao 已提交
120
#ifdef USE_ROCKSDB
121
  SStreamMeta* pMeta = pStreamTask->pMeta;
dengyihao's avatar
dengyihao 已提交
122
  pState->streamBackendRid = pMeta->streamBackendRid;
123
  int code = streamStateOpenBackend(pMeta->streamBackend, pState);
dengyihao's avatar
dengyihao 已提交
124
  if (code == -1) {
dengyihao's avatar
dengyihao 已提交
125
    taosReleaseRef(streamBackendId, pMeta->streamBackendRid);
dengyihao's avatar
dengyihao 已提交
126 127 128
    taosMemoryFree(pState);
    pState = NULL;
  }
129

dengyihao's avatar
dengyihao 已提交
130
  pState->pTdbState->pOwner = pTask;
5
54liuyao 已提交
131
  pState->pFileState = NULL;
L
liuyao 已提交
132
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
dengyihao's avatar
dengyihao 已提交
133

134
  pState->parNameMap = tSimpleHashInit(1024, hashFn);
dengyihao's avatar
dengyihao 已提交
135 136 137
  return pState;

#else
L
add cfg  
Liu Jicong 已提交
138

L
Liu Jicong 已提交
139
  char cfgPath[1030];
L
add cfg  
Liu Jicong 已提交
140 141
  sprintf(cfgPath, "%s/cfg", statePath);

D
dapan1121 已提交
142 143
  szPage = szPage < 0 ? 4096 : szPage;
  pages = pages < 0 ? 256 : pages;
L
add cfg  
Liu Jicong 已提交
144 145 146 147
  char cfg[1024];
  memset(cfg, 0, 1024);
  TdFilePtr pCfgFile = taosOpenFile(cfgPath, TD_FILE_READ);
  if (pCfgFile != NULL) {
D
dapan1121 已提交
148
    int64_t size = 0;
L
add cfg  
Liu Jicong 已提交
149
    taosFStatFile(pCfgFile, &size, NULL);
D
dapan1121 已提交
150 151 152 153
    if (size > 0) {
      taosReadFile(pCfgFile, cfg, size);
      sscanf(cfg, "%d\n%d\n", &szPage, &pages);
    }
L
add cfg  
Liu Jicong 已提交
154
  } else {
D
dapan1121 已提交
155 156 157 158 159 160
    int32_t code = taosMulModeMkDir(statePath, 0755);
    if (code == 0) {
      pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE);
      sprintf(cfg, "%d\n%d\n", szPage, pages);
      taosWriteFile(pCfgFile, cfg, strlen(cfg));
    }
L
add cfg  
Liu Jicong 已提交
161 162 163
  }
  taosCloseFile(&pCfgFile);

164
  if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) {
165 166 167 168
    goto _err;
  }

  // open state storage backend
5
54liuyao 已提交
169 170
  if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
                0) < 0) {
171 172 173
    goto _err;
  }

5
54liuyao 已提交
174
  // todo refactor
5
54liuyao 已提交
175 176
  if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFillStateDb, 0) < 0) {
5
54liuyao 已提交
177 178 179
    goto _err;
  }

5
54liuyao 已提交
180 181
  if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pSessionStateDb, 0) < 0) {
5
54liuyao 已提交
182 183 184
    goto _err;
  }

5
54liuyao 已提交
185 186
  if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFuncStateDb, 0) < 0) {
187 188 189
    goto _err;
  }

5
54liuyao 已提交
190 191
  if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
                &pState->pTdbState->pParNameDb, 0) < 0) {
192 193 194
    goto _err;
  }

L
Liu Jicong 已提交
195 196 197 198 199
  if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) <
      0) {
    goto _err;
  }

200
  if (streamStateBegin(pState) < 0) {
201 202 203
    goto _err;
  }

5
54liuyao 已提交
204
  pState->pTdbState->pOwner = pTask;
L
liuyao 已提交
205
  pState->checkPointId = 0;
206 207 208 209

  return pState;

_err:
5
54liuyao 已提交
210 211 212 213 214
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
215
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
216
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
217
  streamStateDestroy(pState, false);
218
  return NULL;
dengyihao's avatar
dengyihao 已提交
219
#endif
220 221
}

dengyihao's avatar
dengyihao 已提交
222
void streamStateClose(SStreamState* pState, bool remove) {
dengyihao's avatar
dengyihao 已提交
223
  SStreamTask* pTask = pState->pTdbState->pOwner;
dengyihao's avatar
dengyihao 已提交
224
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
225
  streamStateDestroy(pState, remove);
dengyihao's avatar
dengyihao 已提交
226
#else
227 228
  tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
  tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
5
54liuyao 已提交
229 230 231 232 233
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
234
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
235
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
236
#endif
237 238 239
}

int32_t streamStateBegin(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
240 241 242
#ifdef USE_ROCKSDB
  return 0;
#else
243
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
244
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
245
    tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
246 247 248
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
249
#endif
250 251 252
}

int32_t streamStateCommit(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
253
#ifdef USE_ROCKSDB
L
liuyao 已提交
254 255 256 257
  if (pState->pFileState) {
    SStreamSnapshot* pShot = getSnapshot(pState->pFileState);
    flushSnapshot(pState->pFileState, pShot, true);
  }
5
54liuyao 已提交
258
  pState->checkPointId++;
dengyihao's avatar
dengyihao 已提交
259 260
  return 0;
#else
261
  if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
M
Minglei Jin 已提交
262 263
    return -1;
  }
264
  if (tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
265 266
    return -1;
  }
267

268
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
269
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
270 271
    return -1;
  }
L
liuyao 已提交
272
  pState->checkPointId++;
273
  return 0;
dengyihao's avatar
dengyihao 已提交
274
#endif
275 276
}

L
liuyao 已提交
277
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
278
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
279 280 281 282
  void*    pVal = NULL;
  int32_t  len = 0;
  int32_t  code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
  char*    buf = ((SRowBuffPos*)pVal)->pRowBuff;
L
liuyao 已提交
283 284 285
  uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
  memcpy(buf + len - rowSize, value, vLen);
  return code;
dengyihao's avatar
dengyihao 已提交
286
#else
287
  return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
288
#endif
289
}
L
liuyao 已提交
290
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
291
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
292 293 294 295
  void*    pVal = NULL;
  int32_t  len = 0;
  int32_t  code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
  char*    buf = ((SRowBuffPos*)pVal)->pRowBuff;
L
liuyao 已提交
296 297 298
  uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
  *ppVal = buf + len - rowSize;
  return code;
dengyihao's avatar
dengyihao 已提交
299
#else
L
liuyao 已提交
300
  return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen);
dengyihao's avatar
dengyihao 已提交
301
#endif
302 303
}

304
// todo refactor
305
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
306
#ifdef USE_ROCKSDB
5
54liuyao 已提交
307 308
  return 0;
  // return streamStatePut_rocksdb(pState, key, value, vLen);
dengyihao's avatar
dengyihao 已提交
309
#else
310
  SStateKey sKey = {.key = *key, .opNum = pState->number};
311
  return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
312
#endif
313
}
5
54liuyao 已提交
314

dengyihao's avatar
dengyihao 已提交
315
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
316
#ifdef USE_ROCKSDB
5
54liuyao 已提交
317
  return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
318
#else
dengyihao's avatar
dengyihao 已提交
319 320
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
321
#endif
5
54liuyao 已提交
322
}
5
54liuyao 已提交
323 324

bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
325
#ifdef USE_ROCKSDB
5
54liuyao 已提交
326 327 328 329 330 331 332 333
  return hasRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey));
#else
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
#endif
}

int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
L
liuyao 已提交
334 335 336
  int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
  releaseRowBuffPos(pos);
  return code;
5
54liuyao 已提交
337 338
}

339
// todo refactor
dengyihao's avatar
dengyihao 已提交
340
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
341
#ifdef USE_ROCKSDB
5
54liuyao 已提交
342
  return deleteRowBuff(pState->pFileState, key, sizeof(SWinKey));
dengyihao's avatar
dengyihao 已提交
343
#else
344
  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
345 346 347 348 349 350 351 352 353 354
  return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
#endif
}

// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
  return streamStateFillPut_rocksdb(pState, key, value, vLen);
#else
  return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
355
#endif
356 357
}

5
54liuyao 已提交
358 359
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
360 361 362
#ifdef USE_ROCKSDB
  return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
#else
5
54liuyao 已提交
363
  return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
364
#endif
5
54liuyao 已提交
365 366
}

367
// todo refactor
dengyihao's avatar
dengyihao 已提交
368
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
369
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
370
  return streamStateFillDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
371
#else
dengyihao's avatar
dengyihao 已提交
372
  return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
373
#endif
374 375
}

376
int32_t streamStateClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
377
#ifdef USE_ROCKSDB
5
54liuyao 已提交
378
  streamFileStateClear(pState->pFileState);
L
liuyao 已提交
379 380 381 382
  if (needClearDiskBuff(pState->pFileState)) {
    streamStateClear_rocksdb(pState);
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
383
#else
384 385 386 387
  SWinKey key = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &key, NULL, 0);
  while (1) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
dengyihao's avatar
dengyihao 已提交
388 389
    SWinKey delKey = {0};
    int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
5
54liuyao 已提交
390
    streamStateFreeCur(pCur);
391 392 393 394 395 396 397
    if (code == 0) {
      streamStateDel(pState, &delKey);
    } else {
      break;
    }
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
398
#endif
399 400 401 402
}

void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }

L
fix bug  
liuyao 已提交
403 404 405
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
#ifdef USE_ROCKSDB
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
406
  void*   batch = streamStateCreateBatch();
dengyihao's avatar
dengyihao 已提交
407

dengyihao's avatar
dengyihao 已提交
408
  code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0);
L
fix bug  
liuyao 已提交
409 410 411 412 413 414 415
  if (code != 0) {
    return code;
  }
  code = streamStatePutBatch_rocksdb(pState, batch);
  streamStateDestroyBatch(batch);
  return code;
#else
dengyihao's avatar
dengyihao 已提交
416
  return 0;
L
fix bug  
liuyao 已提交
417 418 419 420 421
#endif
}

int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen) {
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
422 423 424
  int32_t code = 0;
  code = streamDefaultGet_rocksdb(pState, pKey, pVal, pLen);
  return code;
L
fix bug  
liuyao 已提交
425
#else
dengyihao's avatar
dengyihao 已提交
426
  return 0;
L
fix bug  
liuyao 已提交
427 428 429
#endif
}

430
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
431
#ifdef USE_ROCKSDB
5
54liuyao 已提交
432
  return streamStateGet(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
433
#else
434 435 436 437 438
  // todo refactor
  int32_t size = *pVLen;
  if (streamStateGet(pState, key, pVal, pVLen) == 0) {
    return 0;
  }
5
54liuyao 已提交
439 440 441
  *pVal = tdbRealloc(NULL, size);
  memset(*pVal, 0, size);
  return 0;
dengyihao's avatar
dengyihao 已提交
442
#endif
443 444 445 446
}

int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
447
  qDebug("streamStateReleaseBuf");
5
54liuyao 已提交
448 449 450
  if (!pVal) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
451 452 453
#ifdef USE_ROCKSDB
  taosMemoryFree(pVal);
#else
454
  streamStateFreeVal(pVal);
dengyihao's avatar
dengyihao 已提交
455
#endif
456 457 458
  return 0;
}

5
54liuyao 已提交
459
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
460 461 462
#ifdef USE_ROCKSDB
  return streamStateFillGetCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
463 464
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
465
  tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
5
54liuyao 已提交
466

5
54liuyao 已提交
467
  int32_t c = 0;
5
54liuyao 已提交
468 469
  tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
  if (c != 0) {
5
54liuyao 已提交
470
    streamStateFreeCur(pCur);
5
54liuyao 已提交
471 472 473
    return NULL;
  }
  return pCur;
dengyihao's avatar
dengyihao 已提交
474
#endif
5
54liuyao 已提交
475 476 477
}

SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
478 479 480
#ifdef USE_ROCKSDB
  return streamStateGetAndCheckCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
481 482 483 484 485 486
  SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
    if (code == 0) {
      return pCur;
    }
5
54liuyao 已提交
487
    streamStateFreeCur(pCur);
5
54liuyao 已提交
488 489
  }
  return NULL;
dengyihao's avatar
dengyihao 已提交
490
#endif
5
54liuyao 已提交
491 492
}

493
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
494 495 496
#ifdef USE_ROCKSDB
  return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
497 498 499 500
  if (!pCur) {
    return -1;
  }
  const SStateKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
501
  int32_t kLen;
502 503 504 505 506 507 508 509
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
510
#endif
511 512 513
}

int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
514 515 516
#ifdef USE_ROCKSDB
  return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
517 518 519
  if (!pCur) {
    return -1;
  }
520
  const SWinKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
521
  int32_t kLen;
522 523 524 525 526
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  *pKey = *pKTmp;
  return 0;
dengyihao's avatar
dengyihao 已提交
527
#endif
528 529
}

5
54liuyao 已提交
530
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
531 532 533
#ifdef USE_ROCKSDB
  return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
534 535 536
  if (!pCur) {
    return -1;
  }
5
54liuyao 已提交
537
  uint64_t groupId = pKey->groupId;
dengyihao's avatar
dengyihao 已提交
538
  int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
5
54liuyao 已提交
539 540 541 542 543 544
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
  }
  return -1;
dengyihao's avatar
dengyihao 已提交
545
#endif
5
54liuyao 已提交
546 547
}

548
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
549 550 551
#ifdef USE_ROCKSDB
  return streamStateGetFirst_rocksdb(pState, key);
#else
552 553 554 555
  // todo refactor
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
dengyihao's avatar
dengyihao 已提交
556
  int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
M
Minglei Jin 已提交
557
  streamStateFreeCur(pCur);
558 559
  streamStateDel(pState, &tmp);
  return code;
dengyihao's avatar
dengyihao 已提交
560
#endif
561 562
}

563
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
564 565 566 567
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_first(pCur->iter);
  return 0;
#else
568
  return tdbTbcMoveToFirst(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
569
#endif
570 571 572
}

int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
573 574 575 576
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_last(pCur->iter);
  return 0;
#else
577
  return tdbTbcMoveToLast(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
578
#endif
579 580
}

581
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
582 583 584
#ifdef USE_ROCKSDB
  return streamStateSeekKeyNext_rocksdb(pState, key);
#else
585 586 587 588 589
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
590
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
591
    streamStateFreeCur(pCur);
592 593 594 595
    return NULL;
  }

  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
596
  int32_t c = 0;
597
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
5
54liuyao 已提交
598
    streamStateFreeCur(pCur);
599 600 601 602 603
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
604
    streamStateFreeCur(pCur);
605 606 607 608
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
609
#endif
610 611
}

5
54liuyao 已提交
612
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
613 614 615
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyNext_rocksdb(pState, key);
#else
616
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
5
54liuyao 已提交
617
  if (!pCur) {
618 619
    return NULL;
  }
5
54liuyao 已提交
620
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
621
    streamStateFreeCur(pCur);
5
54liuyao 已提交
622 623
    return NULL;
  }
624

5
54liuyao 已提交
625
  int32_t c = 0;
626
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
627
    streamStateFreeCur(pCur);
628 629 630 631 632
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
633
    streamStateFreeCur(pCur);
634 635 636 637
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
638
#endif
639 640
}

5
54liuyao 已提交
641
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
642 643 644
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyPrev_rocksdb(pState, key);
#else
645 646 647 648
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
5
54liuyao 已提交
649
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
650
    streamStateFreeCur(pCur);
5
54liuyao 已提交
651 652
    return NULL;
  }
653

5
54liuyao 已提交
654
  int32_t c = 0;
655
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
656
    streamStateFreeCur(pCur);
657 658 659 660 661
    return NULL;
  }
  if (c < 0) return pCur;

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
5
54liuyao 已提交
662
    streamStateFreeCur(pCur);
663 664 665 666
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
667
#endif
668 669 670
}

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
671 672 673
#ifdef USE_ROCKSDB
  return streamStateCurNext_rocksdb(pState, pCur);
#else
5
54liuyao 已提交
674 675 676
  if (!pCur) {
    return -1;
  }
677 678
  //
  return tdbTbcMoveToNext(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
679
#endif
680 681 682
}

int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
683 684 685
#ifdef USE_ROCKSDB
  return streamStateCurPrev_rocksdb(pState, pCur);
#else
686 687 688
  if (!pCur) {
    return -1;
  }
689
  return tdbTbcMoveToPrev(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
690
#endif
691
}
692
void streamStateFreeCur(SStreamStateCur* pCur) {
693 694 695
  if (!pCur) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
696
  qDebug("streamStateFreeCur");
dengyihao's avatar
dengyihao 已提交
697
  rocksdb_iter_destroy(pCur->iter);
dengyihao's avatar
dengyihao 已提交
698
  if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot);
dengyihao's avatar
dengyihao 已提交
699 700
  rocksdb_readoptions_destroy(pCur->readOpt);

dengyihao's avatar
dengyihao 已提交
701
  tdbTbcClose(pCur->pCur);
702 703 704
  taosMemoryFree(pCur);
}

705
void streamStateFreeVal(void* val) {
dengyihao's avatar
dengyihao 已提交
706 707 708 709 710 711
#ifdef USE_ROCKSDB
  taosMemoryFree(val);
#else
  tdbFree(val);
#endif
}
5
54liuyao 已提交
712 713

int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
714 715 716
#ifdef USE_ROCKSDB
  return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else
5
54liuyao 已提交
717
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
718
  return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
719
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
720
#endif
5
54liuyao 已提交
721 722 723
}

int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
724 725 726 727
#ifdef USE_ROCKSDB
  return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
#else

728
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
dengyihao's avatar
dengyihao 已提交
729 730 731
  SSessionKey resKey = *key;
  void* tmp = NULL;
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
732
  if (code == 0) {
5
54liuyao 已提交
733 734 735 736 737 738 739
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = tdbRealloc(NULL, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
    }
5
54liuyao 已提交
740 741
  }
  streamStateFreeCur(pCur);
742
  return code;
dengyihao's avatar
dengyihao 已提交
743
#endif
5
54liuyao 已提交
744 745 746
}

int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
747 748 749
#ifdef USE_ROCKSDB
  return streamStateSessionDel_rocksdb(pState, key);
#else
5
54liuyao 已提交
750
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
751
  return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
752
#endif
5
54liuyao 已提交
753 754
}

755
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
756 757 758
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
#else
5
54liuyao 已提交
759 760 761 762 763
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
764
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
765 766 767 768 769
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
770
  int32_t c = 0;
5
54liuyao 已提交
771 772 773 774
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
775
  if (c >= 0) return pCur;
5
54liuyao 已提交
776 777 778 779 780 781 782

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
783
#endif
5
54liuyao 已提交
784 785
}

786
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
787 788 789
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key);
#else
790 791 792 793 794
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
795
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
796 797 798 799 800
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
801
  int32_t c = 0;
802 803 804 805 806 807 808 809 810 811 812 813 814
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  if (c <= 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
815
#endif
816 817
}

5
54liuyao 已提交
818
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
819 820 821
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyNext_rocksdb(pState, key);
#else
5
54liuyao 已提交
822 823 824 825 826
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
827
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
828 829 830 831 832
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
833
  int32_t c = 0;
5
54liuyao 已提交
834 835 836 837
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
838
  if (c < 0) return pCur;
5
54liuyao 已提交
839 840 841 842 843 844 845

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
846
#endif
5
54liuyao 已提交
847 848
}

849
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
850 851 852
#ifdef USE_ROCKSDB
  return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
853 854 855
  if (!pCur) {
    return -1;
  }
856
  SStateSessionKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
857
  int32_t kLen;
858
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
5
54liuyao 已提交
859 860 861 862 863 864 865 866 867 868
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
869
#endif
5
54liuyao 已提交
870 871 872
}

int32_t streamStateSessionClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
873 874 875 876
#ifdef USE_ROCKSDB
  return streamStateSessionClear_rocksdb(pState);
#else
  SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
877
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
5
54liuyao 已提交
878 879
  while (1) {
    SSessionKey delKey = {0};
dengyihao's avatar
dengyihao 已提交
880 881 882
    void* buf = NULL;
    int32_t size = 0;
    int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
5
54liuyao 已提交
883
    if (code == 0 && size > 0) {
5
54liuyao 已提交
884 885 886 887 888 889 890 891 892
      memset(buf, 0, size);
      streamStateSessionPut(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return 0;
dengyihao's avatar
dengyihao 已提交
893
#endif
5
54liuyao 已提交
894 895
}

896
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
dengyihao's avatar
dengyihao 已提交
897 898 899
#ifdef USE_ROCKSDB
  return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey);
#else
900 901 902 903 904
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
905
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
906 907
    streamStateFreeCur(pCur);
    return -1;
5
54liuyao 已提交
908 909
  }

910
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
911
  int32_t c = 0;
912 913 914 915 916 917
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return -1;
  }

  SSessionKey resKey = *key;
dengyihao's avatar
dengyihao 已提交
918
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
940 941
    }
  }
942

943
  streamStateFreeCur(pCur);
944
  return -1;
dengyihao's avatar
dengyihao 已提交
945
#endif
946 947
}

948 949
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                        int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
950 951 952
#ifdef USE_ROCKSDB
  return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen);
#else
5
54liuyao 已提交
953
  // todo refactor
dengyihao's avatar
dengyihao 已提交
954
  int32_t res = 0;
955 956 957 958 959
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;
dengyihao's avatar
dengyihao 已提交
960
  void* tmp = tdbRealloc(NULL, valSize);
961 962 963 964 965
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
966
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
967 968 969 970 971 972 973 974 975
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
    streamStateCurNext(pState, pCur);
  } else {
    *key = originKey;
5
54liuyao 已提交
976
    streamStateFreeCur(pCur);
977
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
978
  }
979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995

  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
5
54liuyao 已提交
996
  streamStateFreeCur(pCur);
997
  return res;
dengyihao's avatar
dengyihao 已提交
998 999

#endif
5
54liuyao 已提交
1000 1001 1002 1003 1004
}

int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
1005 1006 1007 1008 1009

#ifdef USE_ROCKSDB
  return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else
  int32_t res = 0;
5
54liuyao 已提交
1010
  SSessionKey tmpKey = *key;
dengyihao's avatar
dengyihao 已提交
1011 1012
  int32_t valSize = *pVLen;
  void* tmp = tdbRealloc(NULL, valSize);
5
54liuyao 已提交
1013 1014 1015 1016
  if (!tmp) {
    return -1;
  }

1017
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
1018
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1019 1020 1021
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
1022
      streamStateSessionDel(pState, key);
1023 1024
      goto _end;
    }
5
54liuyao 已提交
1025 1026 1027 1028

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1029
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1030 1031
      goto _end;
    }
5
54liuyao 已提交
1032 1033 1034 1035 1036 1037

    streamStateCurNext(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
1038 1039
  }

1040
  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1041
  if (code == 0) {
5
54liuyao 已提交
1042 1043 1044
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1045
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
dengyihao's avatar
dengyihao 已提交
1059
#endif
5
54liuyao 已提交
1060
}
5
54liuyao 已提交
1061

1062
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
L
liuyao 已提交
1063
  qDebug("try to write to cf parname");
dengyihao's avatar
dengyihao 已提交
1064
#ifdef USE_ROCKSDB
L
liuyao 已提交
1065 1066 1067 1068 1069 1070 1071 1072
  if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
    if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
      streamStatePutParName_rocksdb(pState, groupId, tbname);
    }
    return TSDB_CODE_SUCCESS;
  }
  tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1073
#else
L
Liu Jicong 已提交
1074 1075
  return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
1076
#endif
1077 1078 1079
}

int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
dengyihao's avatar
dengyihao 已提交
1080
#ifdef USE_ROCKSDB
L
liuyao 已提交
1081 1082 1083 1084 1085 1086 1087 1088 1089 1090
  void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t));
  if (!pStr) {
    if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
      return streamStateGetParName_rocksdb(pState, groupId, pVal);
    }
    return TSDB_CODE_FAILED;
  }
  *pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
  memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN);
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1091
#else
1092
  int32_t len;
5
54liuyao 已提交
1093
  return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
dengyihao's avatar
dengyihao 已提交
1094
#endif
5
54liuyao 已提交
1095 1096
}

dengyihao's avatar
dengyihao 已提交
1097
void streamStateDestroy(SStreamState* pState, bool remove) {
dengyihao's avatar
dengyihao 已提交
1098
#ifdef USE_ROCKSDB
5
54liuyao 已提交
1099
  streamFileStateDestroy(pState->pFileState);
dengyihao's avatar
dengyihao 已提交
1100
  streamStateDestroy_rocksdb(pState, remove);
L
fix bug  
liuyao 已提交
1101
  tSimpleHashCleanup(pState->parNameMap);
dengyihao's avatar
dengyihao 已提交
1102 1103
  // do nothong
#endif
5
54liuyao 已提交
1104 1105
  taosMemoryFreeClear(pState->pTdbState);
  taosMemoryFreeClear(pState);
1106 1107
}

L
liuyao 已提交
1108 1109 1110 1111 1112 1113 1114 1115
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
#ifdef USE_ROCKSDB
  return deleteExpiredCheckPoint(pState->pFileState, mark);
#else
  return 0;
#endif
}

5
54liuyao 已提交
1116 1117 1118 1119 1120 1121 1122
#if 0
char* streamStateSessionDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
1123
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
1124 1125 1126 1127 1128 1129
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SSessionKey key = {0};
1130 1131 1132
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
5
54liuyao 已提交
1133
  if (code != 0) {
1134
    streamStateFreeCur(pCur);
5
54liuyao 已提交
1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
  len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SSessionKey){0};
    code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
1149
      streamStateFreeCur(pCur);
5
54liuyao 已提交
1150 1151 1152 1153 1154 1155
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
    len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
1156
  streamStateFreeCur(pCur);
5
54liuyao 已提交
1157 1158
  return dumpBuf;
}
5
54liuyao 已提交
1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201

char* streamStateIntervalDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SWinKey key = {0};
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateGetKVByCur(pCur, &key, (const void **)&buf, &bufSize);
  if (code != 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
  // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SWinKey){0};
    code = streamStateGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
      streamStateFreeCur(pCur);
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
    // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
  streamStateFreeCur(pCur);
  return dumpBuf;
}
5
54liuyao 已提交
1202
#endif