streamState.c 33.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "streamState.h"
17
#include "executor.h"
dengyihao's avatar
dengyihao 已提交
18
#include "osMemory.h"
dengyihao's avatar
dengyihao 已提交
19
#include "rocksdb/c.h"
dengyihao's avatar
dengyihao 已提交
20
#include "streamBackendRocksdb.h"
21
#include "streamInc.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tcoding.h"
23
#include "tcommon.h"
24
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
25
#include "tref.h"
26 27
#include "ttimer.h"

L
liuyao 已提交
28
#define MAX_TABLE_NAME_NUM 2000000
L
liuyao 已提交
29

dengyihao's avatar
dengyihao 已提交
30
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
5
54liuyao 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
46
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  if (pWin1->win.ekey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
68
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
5
54liuyao 已提交
69 70 71 72 73 74 75 76 77
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

78
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
5
54liuyao 已提交
79 80
}

dengyihao's avatar
dengyihao 已提交
81
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
82 83 84 85 86 87 88 89 90
  SStateKey* pWin1 = (SStateKey*)pKey1;
  SStateKey* pWin2 = (SStateKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

L
liuyao 已提交
91
  return winKeyCmprImpl(&pWin1->key, &pWin2->key);
92 93
}

94
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
dengyihao's avatar
dengyihao 已提交
95
  qWarn("open stream state, %s", path);
96 97 98 99 100
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
  if (pState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
5
54liuyao 已提交
101 102 103
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
  if (pState->pTdbState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
104
    streamStateDestroy(pState, true);
5
54liuyao 已提交
105 106
    return NULL;
  }
L
Liu Jicong 已提交
107

108
  char statePath[1024];
L
Liu Jicong 已提交
109
  if (!specPath) {
110
    sprintf(statePath, "%s/%d", path, pTask->id.taskId);
L
Liu Jicong 已提交
111
  } else {
112 113
    memset(statePath, 0, 1024);
    tstrncpy(statePath, path, 1024);
L
Liu Jicong 已提交
114
  }
dengyihao's avatar
dengyihao 已提交
115 116
  pState->taskId = pTask->id.taskId;
  pState->streamId = pTask->id.streamId;
dengyihao's avatar
dengyihao 已提交
117
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
118
  // qWarn("open stream state1");
dengyihao's avatar
dengyihao 已提交
119
  taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
dengyihao's avatar
dengyihao 已提交
120
  int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState);
dengyihao's avatar
dengyihao 已提交
121
  if (code == -1) {
dengyihao's avatar
dengyihao 已提交
122
    taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
dengyihao's avatar
dengyihao 已提交
123 124 125 126
    taosMemoryFree(pState);
    pState = NULL;
  }
  pState->pTdbState->pOwner = pTask;
5
54liuyao 已提交
127
  pState->pFileState = NULL;
L
liuyao 已提交
128 129
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
  pState->parNameMap = tSimpleHashInit(1024, hashFn);
dengyihao's avatar
dengyihao 已提交
130

dengyihao's avatar
dengyihao 已提交
131 132 133
  return pState;

#else
L
add cfg  
Liu Jicong 已提交
134

L
Liu Jicong 已提交
135
  char cfgPath[1030];
L
add cfg  
Liu Jicong 已提交
136 137
  sprintf(cfgPath, "%s/cfg", statePath);

D
dapan1121 已提交
138 139
  szPage = szPage < 0 ? 4096 : szPage;
  pages = pages < 0 ? 256 : pages;
L
add cfg  
Liu Jicong 已提交
140 141 142 143
  char cfg[1024];
  memset(cfg, 0, 1024);
  TdFilePtr pCfgFile = taosOpenFile(cfgPath, TD_FILE_READ);
  if (pCfgFile != NULL) {
D
dapan1121 已提交
144
    int64_t size = 0;
L
add cfg  
Liu Jicong 已提交
145
    taosFStatFile(pCfgFile, &size, NULL);
D
dapan1121 已提交
146 147 148 149
    if (size > 0) {
      taosReadFile(pCfgFile, cfg, size);
      sscanf(cfg, "%d\n%d\n", &szPage, &pages);
    }
L
add cfg  
Liu Jicong 已提交
150
  } else {
D
dapan1121 已提交
151 152 153 154 155 156
    int32_t code = taosMulModeMkDir(statePath, 0755);
    if (code == 0) {
      pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE);
      sprintf(cfg, "%d\n%d\n", szPage, pages);
      taosWriteFile(pCfgFile, cfg, strlen(cfg));
    }
L
add cfg  
Liu Jicong 已提交
157 158 159
  }
  taosCloseFile(&pCfgFile);

160
  if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) {
161 162 163 164
    goto _err;
  }

  // open state storage backend
5
54liuyao 已提交
165 166
  if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
                0) < 0) {
167 168 169
    goto _err;
  }

5
54liuyao 已提交
170
  // todo refactor
5
54liuyao 已提交
171 172
  if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFillStateDb, 0) < 0) {
5
54liuyao 已提交
173 174 175
    goto _err;
  }

5
54liuyao 已提交
176 177
  if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pSessionStateDb, 0) < 0) {
5
54liuyao 已提交
178 179 180
    goto _err;
  }

5
54liuyao 已提交
181 182
  if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFuncStateDb, 0) < 0) {
183 184 185
    goto _err;
  }

5
54liuyao 已提交
186 187
  if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
                &pState->pTdbState->pParNameDb, 0) < 0) {
188 189 190
    goto _err;
  }

L
Liu Jicong 已提交
191 192 193 194 195
  if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) <
      0) {
    goto _err;
  }

196
  if (streamStateBegin(pState) < 0) {
197 198 199
    goto _err;
  }

5
54liuyao 已提交
200
  pState->pTdbState->pOwner = pTask;
L
liuyao 已提交
201
  pState->checkPointId = 0;
202 203 204 205

  return pState;

_err:
5
54liuyao 已提交
206 207 208 209 210
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
211
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
212
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
213
  streamStateDestroy(pState, false);
214
  return NULL;
dengyihao's avatar
dengyihao 已提交
215
#endif
216 217
}

dengyihao's avatar
dengyihao 已提交
218
void streamStateClose(SStreamState* pState, bool remove) {
dengyihao's avatar
dengyihao 已提交
219
  SStreamTask* pTask = pState->pTdbState->pOwner;
dengyihao's avatar
dengyihao 已提交
220
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
221 222
  // streamStateCloseBackend(pState);
  streamStateDestroy(pState, remove);
dengyihao's avatar
dengyihao 已提交
223
  taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
dengyihao's avatar
dengyihao 已提交
224
#else
225 226
  tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
  tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
5
54liuyao 已提交
227 228 229 230 231
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
232
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
233
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
234
#endif
235 236 237
}

int32_t streamStateBegin(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
238 239 240
#ifdef USE_ROCKSDB
  return 0;
#else
241
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
242
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
243
    tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
244 245 246
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
247
#endif
248 249 250
}

int32_t streamStateCommit(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
251
#ifdef USE_ROCKSDB
L
liuyao 已提交
252 253 254 255
  if (pState->pFileState) {
    SStreamSnapshot* pShot = getSnapshot(pState->pFileState);
    flushSnapshot(pState->pFileState, pShot, true);
  }
5
54liuyao 已提交
256
  pState->checkPointId++;
dengyihao's avatar
dengyihao 已提交
257 258
  return 0;
#else
259
  if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
M
Minglei Jin 已提交
260 261
    return -1;
  }
262
  if (tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
263 264
    return -1;
  }
265

266
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
267
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
268 269
    return -1;
  }
L
liuyao 已提交
270
  pState->checkPointId++;
271
  return 0;
dengyihao's avatar
dengyihao 已提交
272
#endif
273 274
}

L
liuyao 已提交
275
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
276
#ifdef USE_ROCKSDB
L
liuyao 已提交
277 278 279 280 281 282 283
  void* pVal = NULL;
  int32_t len = 0;
  int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
  char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
  uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
  memcpy(buf + len - rowSize, value, vLen);
  return code;
dengyihao's avatar
dengyihao 已提交
284
#else
285
  return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
286
#endif
287
}
L
liuyao 已提交
288
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
289
#ifdef USE_ROCKSDB
L
liuyao 已提交
290 291 292 293 294 295 296
  void* pVal = NULL;
  int32_t len = 0;
  int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
  char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
  uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
  *ppVal = buf + len - rowSize;
  return code;
dengyihao's avatar
dengyihao 已提交
297
#else
L
liuyao 已提交
298
  return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen);
dengyihao's avatar
dengyihao 已提交
299
#endif
300 301
}

302
// todo refactor
303
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
304
#ifdef USE_ROCKSDB
5
54liuyao 已提交
305 306
  return 0;
  // return streamStatePut_rocksdb(pState, key, value, vLen);
dengyihao's avatar
dengyihao 已提交
307
#else
308
  SStateKey sKey = {.key = *key, .opNum = pState->number};
309
  return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
310
#endif
311
}
5
54liuyao 已提交
312

dengyihao's avatar
dengyihao 已提交
313
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
314
#ifdef USE_ROCKSDB
5
54liuyao 已提交
315
  return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
316
#else
dengyihao's avatar
dengyihao 已提交
317 318
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
319
#endif
5
54liuyao 已提交
320
}
5
54liuyao 已提交
321 322

bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
323
#ifdef USE_ROCKSDB
5
54liuyao 已提交
324 325 326 327 328 329 330 331
  return hasRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey));
#else
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
#endif
}

int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
L
liuyao 已提交
332 333 334
  int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
  releaseRowBuffPos(pos);
  return code;
5
54liuyao 已提交
335 336
}

337
// todo refactor
dengyihao's avatar
dengyihao 已提交
338
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
339
#ifdef USE_ROCKSDB
5
54liuyao 已提交
340
  return deleteRowBuff(pState->pFileState, key, sizeof(SWinKey));
dengyihao's avatar
dengyihao 已提交
341
#else
342
  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
343 344 345 346 347 348 349 350 351 352
  return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
#endif
}

// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
  return streamStateFillPut_rocksdb(pState, key, value, vLen);
#else
  return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
353
#endif
354 355
}

5
54liuyao 已提交
356 357
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
358 359 360
#ifdef USE_ROCKSDB
  return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
#else
5
54liuyao 已提交
361
  return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
362
#endif
5
54liuyao 已提交
363 364
}

365
// todo refactor
dengyihao's avatar
dengyihao 已提交
366
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
367
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
368
  return streamStateFillDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
369
#else
dengyihao's avatar
dengyihao 已提交
370
  return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
371
#endif
372 373
}

374
int32_t streamStateClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
375
#ifdef USE_ROCKSDB
5
54liuyao 已提交
376
  streamFileStateClear(pState->pFileState);
L
liuyao 已提交
377 378 379 380
  if (needClearDiskBuff(pState->pFileState)) {
    streamStateClear_rocksdb(pState);
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
381
#else
382 383 384 385
  SWinKey key = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &key, NULL, 0);
  while (1) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
dengyihao's avatar
dengyihao 已提交
386 387
    SWinKey delKey = {0};
    int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
5
54liuyao 已提交
388
    streamStateFreeCur(pCur);
389 390 391 392 393 394 395
    if (code == 0) {
      streamStateDel(pState, &delKey);
    } else {
      break;
    }
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
396
#endif
397 398 399 400
}

void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }

L
fix bug  
liuyao 已提交
401 402 403
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
#ifdef USE_ROCKSDB
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
404
  void*   batch = streamStateCreateBatch();
dengyihao's avatar
dengyihao 已提交
405

dengyihao's avatar
dengyihao 已提交
406
  code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0);
L
fix bug  
liuyao 已提交
407 408 409 410 411 412 413
  if (code != 0) {
    return code;
  }
  code = streamStatePutBatch_rocksdb(pState, batch);
  streamStateDestroyBatch(batch);
  return code;
#else
dengyihao's avatar
dengyihao 已提交
414
  return 0;
L
fix bug  
liuyao 已提交
415 416 417 418 419
#endif
}

int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen) {
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
420 421 422
  int32_t code = 0;
  code = streamDefaultGet_rocksdb(pState, pKey, pVal, pLen);
  return code;
L
fix bug  
liuyao 已提交
423
#else
dengyihao's avatar
dengyihao 已提交
424
  return 0;
L
fix bug  
liuyao 已提交
425 426 427
#endif
}

428
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
429
#ifdef USE_ROCKSDB
5
54liuyao 已提交
430
  return streamStateGet(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
431
#else
432 433 434 435 436
  // todo refactor
  int32_t size = *pVLen;
  if (streamStateGet(pState, key, pVal, pVLen) == 0) {
    return 0;
  }
5
54liuyao 已提交
437 438 439
  *pVal = tdbRealloc(NULL, size);
  memset(*pVal, 0, size);
  return 0;
dengyihao's avatar
dengyihao 已提交
440
#endif
441 442 443 444
}

int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
445
  qDebug("streamStateReleaseBuf");
5
54liuyao 已提交
446 447 448
  if (!pVal) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
449 450 451
#ifdef USE_ROCKSDB
  taosMemoryFree(pVal);
#else
452
  streamFreeVal(pVal);
dengyihao's avatar
dengyihao 已提交
453
#endif
454 455 456
  return 0;
}

5
54liuyao 已提交
457
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
458 459 460
#ifdef USE_ROCKSDB
  return streamStateFillGetCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
461 462
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
463
  tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
5
54liuyao 已提交
464

5
54liuyao 已提交
465
  int32_t c = 0;
5
54liuyao 已提交
466 467
  tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
  if (c != 0) {
5
54liuyao 已提交
468
    streamStateFreeCur(pCur);
5
54liuyao 已提交
469 470 471
    return NULL;
  }
  return pCur;
dengyihao's avatar
dengyihao 已提交
472
#endif
5
54liuyao 已提交
473 474 475
}

SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
476 477 478
#ifdef USE_ROCKSDB
  return streamStateGetAndCheckCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
479 480 481 482 483 484
  SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
    if (code == 0) {
      return pCur;
    }
5
54liuyao 已提交
485
    streamStateFreeCur(pCur);
5
54liuyao 已提交
486 487
  }
  return NULL;
dengyihao's avatar
dengyihao 已提交
488
#endif
5
54liuyao 已提交
489 490
}

491
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
492 493 494
#ifdef USE_ROCKSDB
  return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
495 496 497 498
  if (!pCur) {
    return -1;
  }
  const SStateKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
499
  int32_t kLen;
500 501 502 503 504 505 506 507
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
508
#endif
509 510 511
}

int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
512 513 514
#ifdef USE_ROCKSDB
  return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
515 516 517
  if (!pCur) {
    return -1;
  }
518
  const SWinKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
519
  int32_t kLen;
520 521 522 523 524
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  *pKey = *pKTmp;
  return 0;
dengyihao's avatar
dengyihao 已提交
525
#endif
526 527
}

5
54liuyao 已提交
528
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
529 530 531
#ifdef USE_ROCKSDB
  return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
532 533 534
  if (!pCur) {
    return -1;
  }
5
54liuyao 已提交
535
  uint64_t groupId = pKey->groupId;
dengyihao's avatar
dengyihao 已提交
536
  int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
5
54liuyao 已提交
537 538 539 540 541 542
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
  }
  return -1;
dengyihao's avatar
dengyihao 已提交
543
#endif
5
54liuyao 已提交
544 545
}

546
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
547 548 549
#ifdef USE_ROCKSDB
  return streamStateGetFirst_rocksdb(pState, key);
#else
550 551 552 553
  // todo refactor
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
dengyihao's avatar
dengyihao 已提交
554
  int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
M
Minglei Jin 已提交
555
  streamStateFreeCur(pCur);
556 557
  streamStateDel(pState, &tmp);
  return code;
dengyihao's avatar
dengyihao 已提交
558
#endif
559 560
}

561
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
562 563 564 565
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_first(pCur->iter);
  return 0;
#else
566
  return tdbTbcMoveToFirst(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
567
#endif
568 569 570
}

int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
571 572 573 574
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_last(pCur->iter);
  return 0;
#else
575
  return tdbTbcMoveToLast(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
576
#endif
577 578
}

579
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
580 581 582
#ifdef USE_ROCKSDB
  return streamStateSeekKeyNext_rocksdb(pState, key);
#else
583 584 585 586 587
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
588
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
589
    streamStateFreeCur(pCur);
590 591 592 593
    return NULL;
  }

  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
594
  int32_t c = 0;
595
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
5
54liuyao 已提交
596
    streamStateFreeCur(pCur);
597 598 599 600 601
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
602
    streamStateFreeCur(pCur);
603 604 605 606
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
607
#endif
608 609
}

5
54liuyao 已提交
610
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
611 612 613
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyNext_rocksdb(pState, key);
#else
614
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
5
54liuyao 已提交
615
  if (!pCur) {
616 617
    return NULL;
  }
5
54liuyao 已提交
618
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
619
    streamStateFreeCur(pCur);
5
54liuyao 已提交
620 621
    return NULL;
  }
622

5
54liuyao 已提交
623
  int32_t c = 0;
624
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
625
    streamStateFreeCur(pCur);
626 627 628 629 630
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
631
    streamStateFreeCur(pCur);
632 633 634 635
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
636
#endif
637 638
}

5
54liuyao 已提交
639
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
640 641 642
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyPrev_rocksdb(pState, key);
#else
643 644 645 646
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
5
54liuyao 已提交
647
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
648
    streamStateFreeCur(pCur);
5
54liuyao 已提交
649 650
    return NULL;
  }
651

5
54liuyao 已提交
652
  int32_t c = 0;
653
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
654
    streamStateFreeCur(pCur);
655 656 657 658 659
    return NULL;
  }
  if (c < 0) return pCur;

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
5
54liuyao 已提交
660
    streamStateFreeCur(pCur);
661 662 663 664
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
665
#endif
666 667 668
}

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
669 670 671
#ifdef USE_ROCKSDB
  return streamStateCurNext_rocksdb(pState, pCur);
#else
5
54liuyao 已提交
672 673 674
  if (!pCur) {
    return -1;
  }
675 676
  //
  return tdbTbcMoveToNext(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
677
#endif
678 679 680
}

int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
681 682 683
#ifdef USE_ROCKSDB
  return streamStateCurPrev_rocksdb(pState, pCur);
#else
684 685 686
  if (!pCur) {
    return -1;
  }
687
  return tdbTbcMoveToPrev(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
688
#endif
689
}
690
void streamStateFreeCur(SStreamStateCur* pCur) {
691 692 693
  if (!pCur) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
694
  qDebug("streamStateFreeCur");
dengyihao's avatar
dengyihao 已提交
695
  rocksdb_iter_destroy(pCur->iter);
dengyihao's avatar
dengyihao 已提交
696
  if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot);
dengyihao's avatar
dengyihao 已提交
697 698
  rocksdb_readoptions_destroy(pCur->readOpt);

dengyihao's avatar
dengyihao 已提交
699
  tdbTbcClose(pCur->pCur);
700 701 702
  taosMemoryFree(pCur);
}

dengyihao's avatar
dengyihao 已提交
703 704 705 706 707 708 709
void streamFreeVal(void* val) {
#ifdef USE_ROCKSDB
  taosMemoryFree(val);
#else
  tdbFree(val);
#endif
}
5
54liuyao 已提交
710 711

int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
712 713 714
#ifdef USE_ROCKSDB
  return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else
5
54liuyao 已提交
715
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
716
  return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
717
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
718
#endif
5
54liuyao 已提交
719 720 721
}

int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
722 723 724 725
#ifdef USE_ROCKSDB
  return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
#else

726
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
dengyihao's avatar
dengyihao 已提交
727 728 729
  SSessionKey resKey = *key;
  void* tmp = NULL;
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
730
  if (code == 0) {
5
54liuyao 已提交
731 732 733 734 735 736 737
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = tdbRealloc(NULL, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
    }
5
54liuyao 已提交
738 739
  }
  streamStateFreeCur(pCur);
740
  return code;
dengyihao's avatar
dengyihao 已提交
741
#endif
5
54liuyao 已提交
742 743 744
}

int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
745 746 747
#ifdef USE_ROCKSDB
  return streamStateSessionDel_rocksdb(pState, key);
#else
5
54liuyao 已提交
748
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
749
  return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
750
#endif
5
54liuyao 已提交
751 752
}

753
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
754 755 756
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
#else
5
54liuyao 已提交
757 758 759 760 761
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
762
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
763 764 765 766 767
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
768
  int32_t c = 0;
5
54liuyao 已提交
769 770 771 772
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
773
  if (c >= 0) return pCur;
5
54liuyao 已提交
774 775 776 777 778 779 780

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
781
#endif
5
54liuyao 已提交
782 783
}

784
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
785 786 787
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key);
#else
788 789 790 791 792
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
793
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
794 795 796 797 798
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
799
  int32_t c = 0;
800 801 802 803 804 805 806 807 808 809 810 811 812
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  if (c <= 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
813
#endif
814 815
}

5
54liuyao 已提交
816
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
817 818 819
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyNext_rocksdb(pState, key);
#else
5
54liuyao 已提交
820 821 822 823 824
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
825
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
826 827 828 829 830
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
831
  int32_t c = 0;
5
54liuyao 已提交
832 833 834 835
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
836
  if (c < 0) return pCur;
5
54liuyao 已提交
837 838 839 840 841 842 843

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
844
#endif
5
54liuyao 已提交
845 846
}

847
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
848 849 850
#ifdef USE_ROCKSDB
  return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
851 852 853
  if (!pCur) {
    return -1;
  }
854
  SStateSessionKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
855
  int32_t kLen;
856
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
5
54liuyao 已提交
857 858 859 860 861 862 863 864 865 866
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
867
#endif
5
54liuyao 已提交
868 869 870
}

int32_t streamStateSessionClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
871 872 873 874
#ifdef USE_ROCKSDB
  return streamStateSessionClear_rocksdb(pState);
#else
  SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
875
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
5
54liuyao 已提交
876 877
  while (1) {
    SSessionKey delKey = {0};
dengyihao's avatar
dengyihao 已提交
878 879 880
    void* buf = NULL;
    int32_t size = 0;
    int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
5
54liuyao 已提交
881
    if (code == 0 && size > 0) {
5
54liuyao 已提交
882 883 884 885 886 887 888 889 890
      memset(buf, 0, size);
      streamStateSessionPut(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return 0;
dengyihao's avatar
dengyihao 已提交
891
#endif
5
54liuyao 已提交
892 893
}

894
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
dengyihao's avatar
dengyihao 已提交
895 896 897
#ifdef USE_ROCKSDB
  return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey);
#else
898 899 900 901 902
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
903
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
904 905
    streamStateFreeCur(pCur);
    return -1;
5
54liuyao 已提交
906 907
  }

908
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
909
  int32_t c = 0;
910 911 912 913 914 915
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return -1;
  }

  SSessionKey resKey = *key;
dengyihao's avatar
dengyihao 已提交
916
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
938 939
    }
  }
940

941
  streamStateFreeCur(pCur);
942
  return -1;
dengyihao's avatar
dengyihao 已提交
943
#endif
944 945
}

946 947
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                        int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
948 949 950
#ifdef USE_ROCKSDB
  return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen);
#else
5
54liuyao 已提交
951
  // todo refactor
dengyihao's avatar
dengyihao 已提交
952
  int32_t res = 0;
953 954 955 956 957
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;
dengyihao's avatar
dengyihao 已提交
958
  void* tmp = tdbRealloc(NULL, valSize);
959 960 961 962 963
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
964
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
965 966 967 968 969 970 971 972 973
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
    streamStateCurNext(pState, pCur);
  } else {
    *key = originKey;
5
54liuyao 已提交
974
    streamStateFreeCur(pCur);
975
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
976
  }
977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993

  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
5
54liuyao 已提交
994
  streamStateFreeCur(pCur);
995
  return res;
dengyihao's avatar
dengyihao 已提交
996 997

#endif
5
54liuyao 已提交
998 999 1000 1001 1002
}

int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
1003 1004 1005 1006 1007

#ifdef USE_ROCKSDB
  return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else
  int32_t res = 0;
5
54liuyao 已提交
1008
  SSessionKey tmpKey = *key;
dengyihao's avatar
dengyihao 已提交
1009 1010
  int32_t valSize = *pVLen;
  void* tmp = tdbRealloc(NULL, valSize);
5
54liuyao 已提交
1011 1012 1013 1014
  if (!tmp) {
    return -1;
  }

1015
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
1016
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1017 1018 1019
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
1020
      streamStateSessionDel(pState, key);
1021 1022
      goto _end;
    }
5
54liuyao 已提交
1023 1024 1025 1026

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1027
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1028 1029
      goto _end;
    }
5
54liuyao 已提交
1030 1031 1032 1033 1034 1035

    streamStateCurNext(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
1036 1037
  }

1038
  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1039
  if (code == 0) {
5
54liuyao 已提交
1040 1041 1042
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1043
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
dengyihao's avatar
dengyihao 已提交
1057
#endif
5
54liuyao 已提交
1058
}
5
54liuyao 已提交
1059

1060
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
dengyihao's avatar
dengyihao 已提交
1061
  qWarn("try to write to cf parname");
dengyihao's avatar
dengyihao 已提交
1062
#ifdef USE_ROCKSDB
L
liuyao 已提交
1063 1064 1065 1066 1067 1068 1069 1070
  if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
    if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
      streamStatePutParName_rocksdb(pState, groupId, tbname);
    }
    return TSDB_CODE_SUCCESS;
  }
  tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1071
#else
L
Liu Jicong 已提交
1072 1073
  return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
1074
#endif
1075 1076 1077
}

int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
dengyihao's avatar
dengyihao 已提交
1078
#ifdef USE_ROCKSDB
L
liuyao 已提交
1079 1080 1081 1082 1083 1084 1085 1086 1087 1088
  void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t));
  if (!pStr) {
    if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
      return streamStateGetParName_rocksdb(pState, groupId, pVal);
    }
    return TSDB_CODE_FAILED;
  }
  *pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
  memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN);
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1089
#else
1090
  int32_t len;
5
54liuyao 已提交
1091
  return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
dengyihao's avatar
dengyihao 已提交
1092
#endif
5
54liuyao 已提交
1093 1094
}

dengyihao's avatar
dengyihao 已提交
1095
void streamStateDestroy(SStreamState* pState, bool remove) {
dengyihao's avatar
dengyihao 已提交
1096
#ifdef USE_ROCKSDB
5
54liuyao 已提交
1097
  streamFileStateDestroy(pState->pFileState);
dengyihao's avatar
dengyihao 已提交
1098
  streamStateDestroy_rocksdb(pState, remove);
L
fix bug  
liuyao 已提交
1099
  tSimpleHashCleanup(pState->parNameMap);
dengyihao's avatar
dengyihao 已提交
1100 1101
  // do nothong
#endif
5
54liuyao 已提交
1102 1103
  taosMemoryFreeClear(pState->pTdbState);
  taosMemoryFreeClear(pState);
1104 1105
}

L
liuyao 已提交
1106 1107 1108 1109 1110 1111 1112 1113
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
#ifdef USE_ROCKSDB
  return deleteExpiredCheckPoint(pState->pFileState, mark);
#else
  return 0;
#endif
}

5
54liuyao 已提交
1114 1115 1116 1117 1118 1119 1120
#if 0
char* streamStateSessionDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
1121
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
1122 1123 1124 1125 1126 1127
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SSessionKey key = {0};
1128 1129 1130
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
5
54liuyao 已提交
1131
  if (code != 0) {
1132
    streamStateFreeCur(pCur);
5
54liuyao 已提交
1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
  len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SSessionKey){0};
    code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
1147
      streamStateFreeCur(pCur);
5
54liuyao 已提交
1148 1149 1150 1151 1152 1153
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
    len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
1154
  streamStateFreeCur(pCur);
5
54liuyao 已提交
1155 1156
  return dumpBuf;
}
5
54liuyao 已提交
1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199

char* streamStateIntervalDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SWinKey key = {0};
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateGetKVByCur(pCur, &key, (const void **)&buf, &bufSize);
  if (code != 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
  // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SWinKey){0};
    code = streamStateGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
      streamStateFreeCur(pCur);
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
    // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
  streamStateFreeCur(pCur);
  return dumpBuf;
}
5
54liuyao 已提交
1200
#endif