streamState.c 32.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "streamState.h"
17
#include "executor.h"
dengyihao's avatar
dengyihao 已提交
18
#include "osMemory.h"
dengyihao's avatar
dengyihao 已提交
19
#include "rocksdb/c.h"
dengyihao's avatar
dengyihao 已提交
20
#include "streamBackendRocksdb.h"
21
#include "streamInc.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tcoding.h"
23
#include "tcommon.h"
24
#include "tcompare.h"
25 26
#include "ttimer.h"

dengyihao's avatar
dengyihao 已提交
27
#define MAX_TABLE_NAME_NUM 100000
L
liuyao 已提交
28

dengyihao's avatar
dengyihao 已提交
29
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
5
54liuyao 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
45
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  if (pWin1->win.ekey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
67
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
5
54liuyao 已提交
68 69 70 71 72 73 74 75 76
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

77
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
5
54liuyao 已提交
78 79
}

dengyihao's avatar
dengyihao 已提交
80
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
  SStateKey* pWin1 = (SStateKey*)pKey1;
  SStateKey* pWin2 = (SStateKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

  if (pWin1->key.ts > pWin2->key.ts) {
    return 1;
  } else if (pWin1->key.ts < pWin2->key.ts) {
    return -1;
  }

  if (pWin1->key.groupId > pWin2->key.groupId) {
    return 1;
  } else if (pWin1->key.groupId < pWin2->key.groupId) {
    return -1;
  }

  return 0;
}

105
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
dengyihao's avatar
dengyihao 已提交
106
  qWarn("open stream state, %s", path);
107 108 109 110 111
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
  if (pState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
5
54liuyao 已提交
112 113 114 115 116 117
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
  if (pState->pTdbState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    streamStateDestroy(pState);
    return NULL;
  }
L
Liu Jicong 已提交
118

119
  char statePath[1024];
L
Liu Jicong 已提交
120
  if (!specPath) {
121
    sprintf(statePath, "%s/%d", path, pTask->id.taskId);
L
Liu Jicong 已提交
122
  } else {
123 124
    memset(statePath, 0, 1024);
    tstrncpy(statePath, path, 1024);
L
Liu Jicong 已提交
125
  }
dengyihao's avatar
dengyihao 已提交
126 127 128 129 130 131 132 133 134
#ifdef USE_ROCKSDB
  qWarn("open stream state1");
  int code = streamInitBackend(pState, statePath);
  if (code == -1) {
    taosMemoryFree(pState);
    pState = NULL;
  }
  qWarn("open stream state2, %s", statePath);
  pState->pTdbState->pOwner = pTask;
5
54liuyao 已提交
135
  pState->pFileState = NULL;
L
liuyao 已提交
136 137
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
  pState->parNameMap = tSimpleHashInit(1024, hashFn);
dengyihao's avatar
dengyihao 已提交
138 139 140
  return pState;

#else
L
add cfg  
Liu Jicong 已提交
141

L
Liu Jicong 已提交
142
  char cfgPath[1030];
L
add cfg  
Liu Jicong 已提交
143 144
  sprintf(cfgPath, "%s/cfg", statePath);

D
dapan1121 已提交
145 146
  szPage = szPage < 0 ? 4096 : szPage;
  pages = pages < 0 ? 256 : pages;
L
add cfg  
Liu Jicong 已提交
147 148 149 150
  char cfg[1024];
  memset(cfg, 0, 1024);
  TdFilePtr pCfgFile = taosOpenFile(cfgPath, TD_FILE_READ);
  if (pCfgFile != NULL) {
D
dapan1121 已提交
151
    int64_t size = 0;
L
add cfg  
Liu Jicong 已提交
152
    taosFStatFile(pCfgFile, &size, NULL);
D
dapan1121 已提交
153 154 155 156
    if (size > 0) {
      taosReadFile(pCfgFile, cfg, size);
      sscanf(cfg, "%d\n%d\n", &szPage, &pages);
    }
L
add cfg  
Liu Jicong 已提交
157
  } else {
D
dapan1121 已提交
158 159 160 161 162 163
    int32_t code = taosMulModeMkDir(statePath, 0755);
    if (code == 0) {
      pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE);
      sprintf(cfg, "%d\n%d\n", szPage, pages);
      taosWriteFile(pCfgFile, cfg, strlen(cfg));
    }
L
add cfg  
Liu Jicong 已提交
164 165 166
  }
  taosCloseFile(&pCfgFile);

167
  if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) {
168 169 170 171
    goto _err;
  }

  // open state storage backend
5
54liuyao 已提交
172 173
  if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
                0) < 0) {
174 175 176
    goto _err;
  }

5
54liuyao 已提交
177
  // todo refactor
5
54liuyao 已提交
178 179
  if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFillStateDb, 0) < 0) {
5
54liuyao 已提交
180 181 182
    goto _err;
  }

5
54liuyao 已提交
183 184
  if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pSessionStateDb, 0) < 0) {
5
54liuyao 已提交
185 186 187
    goto _err;
  }

5
54liuyao 已提交
188 189
  if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFuncStateDb, 0) < 0) {
190 191 192
    goto _err;
  }

5
54liuyao 已提交
193 194
  if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
                &pState->pTdbState->pParNameDb, 0) < 0) {
195 196 197
    goto _err;
  }

L
Liu Jicong 已提交
198 199 200 201 202
  if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) <
      0) {
    goto _err;
  }

203
  if (streamStateBegin(pState) < 0) {
204 205 206
    goto _err;
  }

5
54liuyao 已提交
207
  pState->pTdbState->pOwner = pTask;
L
liuyao 已提交
208
  pState->checkPointId = 0;
209 210 211 212

  return pState;

_err:
5
54liuyao 已提交
213 214 215 216 217
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
218
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
219 220
  tdbClose(pState->pTdbState->db);
  streamStateDestroy(pState);
221
  return NULL;
dengyihao's avatar
dengyihao 已提交
222
#endif
223 224 225
}

void streamStateClose(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
226
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
227
  streamCleanBackend(pState);
dengyihao's avatar
dengyihao 已提交
228
#else
229 230
  tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
  tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
5
54liuyao 已提交
231 232 233 234 235
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
236
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
237
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
238
#endif
5
54liuyao 已提交
239
  streamStateDestroy(pState);
240 241 242
}

int32_t streamStateBegin(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
243 244 245
#ifdef USE_ROCKSDB
  return 0;
#else
246
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
247
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
248
    tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
249 250 251
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
252
#endif
253 254 255
}

int32_t streamStateCommit(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
256
#ifdef USE_ROCKSDB
L
liuyao 已提交
257 258 259 260
  if (pState->pFileState) {
    SStreamSnapshot* pShot = getSnapshot(pState->pFileState);
    flushSnapshot(pState->pFileState, pShot, true);
  }
5
54liuyao 已提交
261
  pState->checkPointId++;
dengyihao's avatar
dengyihao 已提交
262 263
  return 0;
#else
264
  if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
M
Minglei Jin 已提交
265 266
    return -1;
  }
267
  if (tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
268 269
    return -1;
  }
270

271
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
272
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
273 274
    return -1;
  }
L
liuyao 已提交
275
  pState->checkPointId++;
276
  return 0;
dengyihao's avatar
dengyihao 已提交
277
#endif
278 279
}

280
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
281 282 283
#ifdef USE_ROCKSDB
  return streamStateFuncPut_rocksdb(pState, key, value, vLen);
#else
284
  return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
285
#endif
286 287
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
288
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
289
  return streamStateFuncGet_rocksdb(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
290
#else
5
54liuyao 已提交
291
  return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
292
#endif
293 294 295
}

int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
dengyihao's avatar
dengyihao 已提交
296 297 298
#ifdef USE_ROCKSDB
  return streamStateFuncDel_rocksdb(pState, key);
#else
299
  return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
300
#endif
301 302
}

303
// todo refactor
304
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
305
#ifdef USE_ROCKSDB
5
54liuyao 已提交
306 307
  return 0;
  // return streamStatePut_rocksdb(pState, key, value, vLen);
dengyihao's avatar
dengyihao 已提交
308
#else
309
  SStateKey sKey = {.key = *key, .opNum = pState->number};
310
  return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
311
#endif
312
}
5
54liuyao 已提交
313

dengyihao's avatar
dengyihao 已提交
314
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
315
#ifdef USE_ROCKSDB
5
54liuyao 已提交
316
  return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
317
#else
dengyihao's avatar
dengyihao 已提交
318 319
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
320
#endif
5
54liuyao 已提交
321
}
5
54liuyao 已提交
322 323

bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
324
#ifdef USE_ROCKSDB
5
54liuyao 已提交
325 326 327 328 329 330 331 332
  return hasRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey));
#else
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
#endif
}

int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
L
liuyao 已提交
333 334 335
  int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
  releaseRowBuffPos(pos);
  return code;
5
54liuyao 已提交
336 337
}

338
// todo refactor
dengyihao's avatar
dengyihao 已提交
339
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
340
#ifdef USE_ROCKSDB
5
54liuyao 已提交
341
  return deleteRowBuff(pState->pFileState, key, sizeof(SWinKey));
dengyihao's avatar
dengyihao 已提交
342
#else
343
  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
344 345 346 347 348 349 350 351 352 353
  return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
#endif
}

// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
  return streamStateFillPut_rocksdb(pState, key, value, vLen);
#else
  return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
354
#endif
355 356
}

5
54liuyao 已提交
357 358
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
359 360 361
#ifdef USE_ROCKSDB
  return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
#else
5
54liuyao 已提交
362
  return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
363
#endif
5
54liuyao 已提交
364 365
}

366
// todo refactor
dengyihao's avatar
dengyihao 已提交
367
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
368
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
369
  return streamStateFillDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
370
#else
dengyihao's avatar
dengyihao 已提交
371
  return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
372
#endif
373 374
}

375
int32_t streamStateClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
376
#ifdef USE_ROCKSDB
5
54liuyao 已提交
377
  streamFileStateClear(pState->pFileState);
dengyihao's avatar
dengyihao 已提交
378 379
  return streamStateClear_rocksdb(pState);
#else
380 381 382 383
  SWinKey key = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &key, NULL, 0);
  while (1) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
dengyihao's avatar
dengyihao 已提交
384 385
    SWinKey delKey = {0};
    int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
5
54liuyao 已提交
386
    streamStateFreeCur(pCur);
387 388 389 390 391 392 393
    if (code == 0) {
      streamStateDel(pState, &delKey);
    } else {
      break;
    }
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
394
#endif
395 396 397 398
}

void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }

L
fix bug  
liuyao 已提交
399 400 401
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
#ifdef USE_ROCKSDB
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
402
  void*   batch = streamStateCreateBatch();
L
fix bug  
liuyao 已提交
403 404 405 406 407 408 409 410
  code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen);
  if (code != 0) {
    return code;
  }
  code = streamStatePutBatch_rocksdb(pState, batch);
  streamStateDestroyBatch(batch);
  return code;
#else
dengyihao's avatar
dengyihao 已提交
411
  return 0;
L
fix bug  
liuyao 已提交
412 413 414 415 416
#endif
}

int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen) {
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
417 418 419
  int32_t code = 0;
  code = streamDefaultGet_rocksdb(pState, pKey, pVal, pLen);
  return code;
L
fix bug  
liuyao 已提交
420
#else
dengyihao's avatar
dengyihao 已提交
421
  return 0;
L
fix bug  
liuyao 已提交
422 423 424
#endif
}

425
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
426
#ifdef USE_ROCKSDB
5
54liuyao 已提交
427
  return streamStateGet(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
428
#else
429 430 431 432 433
  // todo refactor
  int32_t size = *pVLen;
  if (streamStateGet(pState, key, pVal, pVLen) == 0) {
    return 0;
  }
5
54liuyao 已提交
434 435 436
  *pVal = tdbRealloc(NULL, size);
  memset(*pVal, 0, size);
  return 0;
dengyihao's avatar
dengyihao 已提交
437
#endif
438 439 440 441
}

int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
442
  qDebug("streamStateReleaseBuf");
5
54liuyao 已提交
443 444 445
  if (!pVal) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
446 447 448
#ifdef USE_ROCKSDB
  taosMemoryFree(pVal);
#else
449
  streamFreeVal(pVal);
dengyihao's avatar
dengyihao 已提交
450
#endif
451 452 453
  return 0;
}

5
54liuyao 已提交
454
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
455 456 457
#ifdef USE_ROCKSDB
  return streamStateFillGetCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
458 459
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
460
  tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
5
54liuyao 已提交
461

5
54liuyao 已提交
462
  int32_t c = 0;
5
54liuyao 已提交
463 464
  tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
  if (c != 0) {
5
54liuyao 已提交
465
    streamStateFreeCur(pCur);
5
54liuyao 已提交
466 467 468
    return NULL;
  }
  return pCur;
dengyihao's avatar
dengyihao 已提交
469
#endif
5
54liuyao 已提交
470 471 472
}

SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
473 474 475
#ifdef USE_ROCKSDB
  return streamStateGetAndCheckCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
476 477 478 479 480 481
  SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
    if (code == 0) {
      return pCur;
    }
5
54liuyao 已提交
482
    streamStateFreeCur(pCur);
5
54liuyao 已提交
483 484
  }
  return NULL;
dengyihao's avatar
dengyihao 已提交
485
#endif
5
54liuyao 已提交
486 487
}

488
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
489 490 491
#ifdef USE_ROCKSDB
  return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
492 493 494 495
  if (!pCur) {
    return -1;
  }
  const SStateKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
496
  int32_t kLen;
497 498 499 500 501 502 503 504
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
505
#endif
506 507 508
}

int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
509 510 511
#ifdef USE_ROCKSDB
  return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
512 513 514
  if (!pCur) {
    return -1;
  }
515
  const SWinKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
516
  int32_t kLen;
517 518 519 520 521
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  *pKey = *pKTmp;
  return 0;
dengyihao's avatar
dengyihao 已提交
522
#endif
523 524
}

5
54liuyao 已提交
525
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
526 527 528
#ifdef USE_ROCKSDB
  return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
529 530 531
  if (!pCur) {
    return -1;
  }
5
54liuyao 已提交
532
  uint64_t groupId = pKey->groupId;
dengyihao's avatar
dengyihao 已提交
533
  int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
5
54liuyao 已提交
534 535 536 537 538 539
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
  }
  return -1;
dengyihao's avatar
dengyihao 已提交
540
#endif
5
54liuyao 已提交
541 542
}

543
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
544 545 546
#ifdef USE_ROCKSDB
  return streamStateGetFirst_rocksdb(pState, key);
#else
547 548 549 550
  // todo refactor
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
dengyihao's avatar
dengyihao 已提交
551
  int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
M
Minglei Jin 已提交
552
  streamStateFreeCur(pCur);
553 554
  streamStateDel(pState, &tmp);
  return code;
dengyihao's avatar
dengyihao 已提交
555
#endif
556 557
}

558
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
559 560 561 562
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_first(pCur->iter);
  return 0;
#else
563
  return tdbTbcMoveToFirst(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
564
#endif
565 566 567
}

int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
568 569 570 571
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_last(pCur->iter);
  return 0;
#else
572
  return tdbTbcMoveToLast(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
573
#endif
574 575
}

576
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
577 578 579
#ifdef USE_ROCKSDB
  return streamStateSeekKeyNext_rocksdb(pState, key);
#else
580 581 582 583 584
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
585
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
586
    streamStateFreeCur(pCur);
587 588 589 590
    return NULL;
  }

  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
591
  int32_t c = 0;
592
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
5
54liuyao 已提交
593
    streamStateFreeCur(pCur);
594 595 596 597 598
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
599
    streamStateFreeCur(pCur);
600 601 602 603
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
604
#endif
605 606
}

5
54liuyao 已提交
607
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
608 609 610
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyNext_rocksdb(pState, key);
#else
611
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
5
54liuyao 已提交
612
  if (!pCur) {
613 614
    return NULL;
  }
5
54liuyao 已提交
615
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
616
    streamStateFreeCur(pCur);
5
54liuyao 已提交
617 618
    return NULL;
  }
619

5
54liuyao 已提交
620
  int32_t c = 0;
621
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
622
    streamStateFreeCur(pCur);
623 624 625 626 627
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
628
    streamStateFreeCur(pCur);
629 630 631 632
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
633
#endif
634 635
}

5
54liuyao 已提交
636
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
637 638 639
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyPrev_rocksdb(pState, key);
#else
640 641 642 643
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
5
54liuyao 已提交
644
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
645
    streamStateFreeCur(pCur);
5
54liuyao 已提交
646 647
    return NULL;
  }
648

5
54liuyao 已提交
649
  int32_t c = 0;
650
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
651
    streamStateFreeCur(pCur);
652 653 654 655 656
    return NULL;
  }
  if (c < 0) return pCur;

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
5
54liuyao 已提交
657
    streamStateFreeCur(pCur);
658 659 660 661
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
662
#endif
663 664 665
}

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
666 667 668
#ifdef USE_ROCKSDB
  return streamStateCurNext_rocksdb(pState, pCur);
#else
5
54liuyao 已提交
669 670 671
  if (!pCur) {
    return -1;
  }
672 673
  //
  return tdbTbcMoveToNext(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
674
#endif
675 676 677
}

int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
678 679 680
#ifdef USE_ROCKSDB
  return streamStateCurPrev_rocksdb(pState, pCur);
#else
681 682 683
  if (!pCur) {
    return -1;
  }
684
  return tdbTbcMoveToPrev(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
685
#endif
686
}
687
void streamStateFreeCur(SStreamStateCur* pCur) {
688 689 690
  if (!pCur) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
691
  qDebug("streamStateFreeCur");
dengyihao's avatar
dengyihao 已提交
692
  rocksdb_iter_destroy(pCur->iter);
dengyihao's avatar
dengyihao 已提交
693
  if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot);
dengyihao's avatar
dengyihao 已提交
694 695
  rocksdb_readoptions_destroy(pCur->readOpt);

dengyihao's avatar
dengyihao 已提交
696
  tdbTbcClose(pCur->pCur);
697 698 699
  taosMemoryFree(pCur);
}

dengyihao's avatar
dengyihao 已提交
700 701 702 703 704 705 706
void streamFreeVal(void* val) {
#ifdef USE_ROCKSDB
  taosMemoryFree(val);
#else
  tdbFree(val);
#endif
}
5
54liuyao 已提交
707 708

int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
709 710 711
#ifdef USE_ROCKSDB
  return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else
5
54liuyao 已提交
712
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
713
  return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
714
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
715
#endif
5
54liuyao 已提交
716 717 718
}

int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
719 720 721 722
#ifdef USE_ROCKSDB
  return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
#else

723
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
dengyihao's avatar
dengyihao 已提交
724 725 726
  SSessionKey resKey = *key;
  void* tmp = NULL;
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
727
  if (code == 0) {
5
54liuyao 已提交
728 729 730 731 732 733 734
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = tdbRealloc(NULL, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
    }
5
54liuyao 已提交
735 736
  }
  streamStateFreeCur(pCur);
737
  return code;
dengyihao's avatar
dengyihao 已提交
738
#endif
5
54liuyao 已提交
739 740 741
}

int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
742 743 744
#ifdef USE_ROCKSDB
  return streamStateSessionDel_rocksdb(pState, key);
#else
5
54liuyao 已提交
745
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
746
  return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
747
#endif
5
54liuyao 已提交
748 749
}

750
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
751 752 753
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
#else
5
54liuyao 已提交
754 755 756 757 758
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
759
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
760 761 762 763 764
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
765
  int32_t c = 0;
5
54liuyao 已提交
766 767 768 769
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
770
  if (c >= 0) return pCur;
5
54liuyao 已提交
771 772 773 774 775 776 777

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
778
#endif
5
54liuyao 已提交
779 780
}

781
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
782 783 784
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key);
#else
785 786 787 788 789
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
790
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
791 792 793 794 795
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
796
  int32_t c = 0;
797 798 799 800 801 802 803 804 805 806 807 808 809
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  if (c <= 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
810
#endif
811 812
}

5
54liuyao 已提交
813
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
814 815 816
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyNext_rocksdb(pState, key);
#else
5
54liuyao 已提交
817 818 819 820 821
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
822
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
823 824 825 826 827
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
828
  int32_t c = 0;
5
54liuyao 已提交
829 830 831 832
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
833
  if (c < 0) return pCur;
5
54liuyao 已提交
834 835 836 837 838 839 840

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
841
#endif
5
54liuyao 已提交
842 843
}

844
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
845 846 847
#ifdef USE_ROCKSDB
  return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
848 849 850
  if (!pCur) {
    return -1;
  }
851
  SStateSessionKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
852
  int32_t kLen;
853
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
5
54liuyao 已提交
854 855 856 857 858 859 860 861 862 863
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
864
#endif
5
54liuyao 已提交
865 866 867
}

int32_t streamStateSessionClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
868 869 870 871
#ifdef USE_ROCKSDB
  return streamStateSessionClear_rocksdb(pState);
#else
  SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
872
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
5
54liuyao 已提交
873 874
  while (1) {
    SSessionKey delKey = {0};
dengyihao's avatar
dengyihao 已提交
875 876 877
    void* buf = NULL;
    int32_t size = 0;
    int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
5
54liuyao 已提交
878
    if (code == 0 && size > 0) {
5
54liuyao 已提交
879 880 881 882 883 884 885 886 887
      memset(buf, 0, size);
      streamStateSessionPut(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return 0;
dengyihao's avatar
dengyihao 已提交
888
#endif
5
54liuyao 已提交
889 890
}

891
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
dengyihao's avatar
dengyihao 已提交
892 893 894
#ifdef USE_ROCKSDB
  return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey);
#else
895 896 897 898 899
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
900
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
901 902
    streamStateFreeCur(pCur);
    return -1;
5
54liuyao 已提交
903 904
  }

905
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
906
  int32_t c = 0;
907 908 909 910 911 912
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return -1;
  }

  SSessionKey resKey = *key;
dengyihao's avatar
dengyihao 已提交
913
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
935 936
    }
  }
937

938
  streamStateFreeCur(pCur);
939
  return -1;
dengyihao's avatar
dengyihao 已提交
940
#endif
941 942
}

943 944
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                        int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
945 946 947
#ifdef USE_ROCKSDB
  return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen);
#else
5
54liuyao 已提交
948
  // todo refactor
dengyihao's avatar
dengyihao 已提交
949
  int32_t res = 0;
950 951 952 953 954
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;
dengyihao's avatar
dengyihao 已提交
955
  void* tmp = tdbRealloc(NULL, valSize);
956 957 958 959 960
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
961
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
962 963 964 965 966 967 968 969 970
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
    streamStateCurNext(pState, pCur);
  } else {
    *key = originKey;
5
54liuyao 已提交
971
    streamStateFreeCur(pCur);
972
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
973
  }
974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990

  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
5
54liuyao 已提交
991
  streamStateFreeCur(pCur);
992
  return res;
dengyihao's avatar
dengyihao 已提交
993 994

#endif
5
54liuyao 已提交
995 996 997 998 999
}

int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
1000 1001 1002 1003 1004

#ifdef USE_ROCKSDB
  return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else
  int32_t res = 0;
5
54liuyao 已提交
1005
  SSessionKey tmpKey = *key;
dengyihao's avatar
dengyihao 已提交
1006 1007
  int32_t valSize = *pVLen;
  void* tmp = tdbRealloc(NULL, valSize);
5
54liuyao 已提交
1008 1009 1010 1011
  if (!tmp) {
    return -1;
  }

1012
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
1013
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1014 1015 1016
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
1017
      streamStateSessionDel(pState, key);
1018 1019
      goto _end;
    }
5
54liuyao 已提交
1020 1021 1022 1023

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1024
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1025 1026
      goto _end;
    }
5
54liuyao 已提交
1027 1028 1029 1030 1031 1032

    streamStateCurNext(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
1033 1034
  }

1035
  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1036
  if (code == 0) {
5
54liuyao 已提交
1037 1038 1039
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1040
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
dengyihao's avatar
dengyihao 已提交
1054
#endif
5
54liuyao 已提交
1055
}
5
54liuyao 已提交
1056

1057
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
dengyihao's avatar
dengyihao 已提交
1058
  qWarn("try to write to cf parname");
dengyihao's avatar
dengyihao 已提交
1059
#ifdef USE_ROCKSDB
L
liuyao 已提交
1060 1061 1062 1063 1064 1065 1066 1067
  if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
    if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
      streamStatePutParName_rocksdb(pState, groupId, tbname);
    }
    return TSDB_CODE_SUCCESS;
  }
  tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1068
#else
L
Liu Jicong 已提交
1069 1070
  return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
1071
#endif
1072 1073 1074
}

int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
dengyihao's avatar
dengyihao 已提交
1075
#ifdef USE_ROCKSDB
L
liuyao 已提交
1076 1077 1078 1079 1080 1081 1082 1083 1084 1085
  void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t));
  if (!pStr) {
    if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
      return streamStateGetParName_rocksdb(pState, groupId, pVal);
    }
    return TSDB_CODE_FAILED;
  }
  *pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
  memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN);
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1086
#else
1087
  int32_t len;
5
54liuyao 已提交
1088
  return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
dengyihao's avatar
dengyihao 已提交
1089
#endif
5
54liuyao 已提交
1090 1091 1092
}

void streamStateDestroy(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
1093
#ifdef USE_ROCKSDB
5
54liuyao 已提交
1094
  streamFileStateDestroy(pState->pFileState);
dengyihao's avatar
dengyihao 已提交
1095
  streamStateDestroy_rocksdb(pState);
L
fix bug  
liuyao 已提交
1096
  tSimpleHashCleanup(pState->parNameMap);
dengyihao's avatar
dengyihao 已提交
1097 1098
  // do nothong
#endif
5
54liuyao 已提交
1099 1100
  taosMemoryFreeClear(pState->pTdbState);
  taosMemoryFreeClear(pState);
1101 1102
}

L
liuyao 已提交
1103 1104 1105 1106 1107 1108 1109 1110
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
#ifdef USE_ROCKSDB
  return deleteExpiredCheckPoint(pState->pFileState, mark);
#else
  return 0;
#endif
}

5
54liuyao 已提交
1111 1112 1113 1114 1115 1116 1117
#if 0
char* streamStateSessionDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
1118
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
1119 1120 1121 1122 1123 1124
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SSessionKey key = {0};
1125 1126 1127
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
5
54liuyao 已提交
1128
  if (code != 0) {
1129
    streamStateFreeCur(pCur);
5
54liuyao 已提交
1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
  len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SSessionKey){0};
    code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
1144
      streamStateFreeCur(pCur);
5
54liuyao 已提交
1145 1146 1147 1148 1149 1150
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
    len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
1151
  streamStateFreeCur(pCur);
5
54liuyao 已提交
1152 1153
  return dumpBuf;
}
5
54liuyao 已提交
1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196

char* streamStateIntervalDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SWinKey key = {0};
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateGetKVByCur(pCur, &key, (const void **)&buf, &bufSize);
  if (code != 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
  // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SWinKey){0};
    code = streamStateGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
      streamStateFreeCur(pCur);
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
    // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
  streamStateFreeCur(pCur);
  return dumpBuf;
}
5
54liuyao 已提交
1197
#endif