streamState.c 32.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "streamState.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include <bits/stdint-uintn.h>
#include <string.h>
19
#include "executor.h"
dengyihao's avatar
dengyihao 已提交
20
#include "osMemory.h"
dengyihao's avatar
dengyihao 已提交
21
#include "rocksdb/c.h"
dengyihao's avatar
dengyihao 已提交
22
#include "streamBackendRocksdb.h"
23
#include "streamInc.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tcoding.h"
25
#include "tcommon.h"
26
#include "tcompare.h"
27 28
#include "ttimer.h"

L
liuyao 已提交
29 30
#define MAX_TABLE_NAME_NUM  100000

dengyihao's avatar
dengyihao 已提交
31
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
5
54liuyao 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
47
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  if (pWin1->win.ekey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
69
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
5
54liuyao 已提交
70 71 72 73 74 75 76 77 78
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

79
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
5
54liuyao 已提交
80 81
}

dengyihao's avatar
dengyihao 已提交
82
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
  SStateKey* pWin1 = (SStateKey*)pKey1;
  SStateKey* pWin2 = (SStateKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

  if (pWin1->key.ts > pWin2->key.ts) {
    return 1;
  } else if (pWin1->key.ts < pWin2->key.ts) {
    return -1;
  }

  if (pWin1->key.groupId > pWin2->key.groupId) {
    return 1;
  } else if (pWin1->key.groupId < pWin2->key.groupId) {
    return -1;
  }

  return 0;
}

107
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
dengyihao's avatar
dengyihao 已提交
108
  qWarn("open stream state, %s", path);
109 110 111 112 113
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
  if (pState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
5
54liuyao 已提交
114 115 116 117 118 119
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
  if (pState->pTdbState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    streamStateDestroy(pState);
    return NULL;
  }
L
Liu Jicong 已提交
120

121
  char statePath[1024];
L
Liu Jicong 已提交
122
  if (!specPath) {
123
    sprintf(statePath, "%s/%d", path, pTask->id.taskId);
L
Liu Jicong 已提交
124
  } else {
125 126
    memset(statePath, 0, 1024);
    tstrncpy(statePath, path, 1024);
L
Liu Jicong 已提交
127
  }
dengyihao's avatar
dengyihao 已提交
128 129 130 131 132 133 134 135 136
#ifdef USE_ROCKSDB
  qWarn("open stream state1");
  int code = streamInitBackend(pState, statePath);
  if (code == -1) {
    taosMemoryFree(pState);
    pState = NULL;
  }
  qWarn("open stream state2, %s", statePath);
  pState->pTdbState->pOwner = pTask;
5
54liuyao 已提交
137
  pState->pFileState = NULL;
L
liuyao 已提交
138 139
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
  pState->parNameMap = tSimpleHashInit(1024, hashFn);
dengyihao's avatar
dengyihao 已提交
140 141 142
  return pState;

#else
L
add cfg  
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
  char cfgPath[1030];
L
add cfg  
Liu Jicong 已提交
145 146
  sprintf(cfgPath, "%s/cfg", statePath);

D
dapan1121 已提交
147 148
  szPage = szPage < 0 ? 4096 : szPage;
  pages = pages < 0 ? 256 : pages;
L
add cfg  
Liu Jicong 已提交
149 150 151 152
  char cfg[1024];
  memset(cfg, 0, 1024);
  TdFilePtr pCfgFile = taosOpenFile(cfgPath, TD_FILE_READ);
  if (pCfgFile != NULL) {
D
dapan1121 已提交
153
    int64_t size = 0;
L
add cfg  
Liu Jicong 已提交
154
    taosFStatFile(pCfgFile, &size, NULL);
D
dapan1121 已提交
155 156 157 158
    if (size > 0) {
      taosReadFile(pCfgFile, cfg, size);
      sscanf(cfg, "%d\n%d\n", &szPage, &pages);
    }
L
add cfg  
Liu Jicong 已提交
159
  } else {
D
dapan1121 已提交
160 161 162 163 164 165
    int32_t code = taosMulModeMkDir(statePath, 0755);
    if (code == 0) {
      pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE);
      sprintf(cfg, "%d\n%d\n", szPage, pages);
      taosWriteFile(pCfgFile, cfg, strlen(cfg));
    }
L
add cfg  
Liu Jicong 已提交
166 167 168
  }
  taosCloseFile(&pCfgFile);

169
  if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) {
170 171 172 173
    goto _err;
  }

  // open state storage backend
5
54liuyao 已提交
174 175
  if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
                0) < 0) {
176 177 178
    goto _err;
  }

5
54liuyao 已提交
179
  // todo refactor
5
54liuyao 已提交
180 181
  if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFillStateDb, 0) < 0) {
5
54liuyao 已提交
182 183 184
    goto _err;
  }

5
54liuyao 已提交
185 186
  if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pSessionStateDb, 0) < 0) {
5
54liuyao 已提交
187 188 189
    goto _err;
  }

5
54liuyao 已提交
190 191
  if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFuncStateDb, 0) < 0) {
192 193 194
    goto _err;
  }

5
54liuyao 已提交
195 196
  if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
                &pState->pTdbState->pParNameDb, 0) < 0) {
197 198 199
    goto _err;
  }

L
Liu Jicong 已提交
200 201 202 203 204
  if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) <
      0) {
    goto _err;
  }

205
  if (streamStateBegin(pState) < 0) {
206 207 208
    goto _err;
  }

5
54liuyao 已提交
209
  pState->pTdbState->pOwner = pTask;
L
liuyao 已提交
210
  pState->checkPointId = 0;
211 212 213 214

  return pState;

_err:
5
54liuyao 已提交
215 216 217 218 219
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
220
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
221 222
  tdbClose(pState->pTdbState->db);
  streamStateDestroy(pState);
223
  return NULL;
dengyihao's avatar
dengyihao 已提交
224
#endif
225 226 227
}

void streamStateClose(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
228
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
229
  streamCleanBackend(pState);
dengyihao's avatar
dengyihao 已提交
230
#else
231 232
  tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
  tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
5
54liuyao 已提交
233 234 235 236 237
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
238
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
239
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
240
#endif
5
54liuyao 已提交
241
  streamStateDestroy(pState);
242 243 244
}

int32_t streamStateBegin(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
245 246 247
#ifdef USE_ROCKSDB
  return 0;
#else
248
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
249
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
250
    tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
251 252 253
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
254
#endif
255 256 257
}

int32_t streamStateCommit(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
258
#ifdef USE_ROCKSDB
L
liuyao 已提交
259 260
  SStreamSnapshot* pShot = getSnapshot(pState->pFileState);
  flushSnapshot(pState->pFileState, pShot, true);
dengyihao's avatar
dengyihao 已提交
261 262
  return 0;
#else
263
  if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
M
Minglei Jin 已提交
264 265
    return -1;
  }
266
  if (tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
267 268
    return -1;
  }
269

270
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
271
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
272 273
    return -1;
  }
L
liuyao 已提交
274
  pState->checkPointId++;
275
  return 0;
dengyihao's avatar
dengyihao 已提交
276
#endif
277 278
}

279
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
280 281 282
#ifdef USE_ROCKSDB
  return streamStateFuncPut_rocksdb(pState, key, value, vLen);
#else
283
  return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
284
#endif
285 286
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
287
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
288
  return streamStateFuncGet_rocksdb(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
289
#else
5
54liuyao 已提交
290
  return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
291
#endif
292 293 294
}

int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
dengyihao's avatar
dengyihao 已提交
295 296 297
#ifdef USE_ROCKSDB
  return streamStateFuncDel_rocksdb(pState, key);
#else
298
  return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
299
#endif
300 301
}

302
// todo refactor
303
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
304
#ifdef USE_ROCKSDB
5
54liuyao 已提交
305 306
  return 0;
  // return streamStatePut_rocksdb(pState, key, value, vLen);
dengyihao's avatar
dengyihao 已提交
307
#else
308
  SStateKey sKey = {.key = *key, .opNum = pState->number};
309
  return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
310
#endif
311
}
5
54liuyao 已提交
312

dengyihao's avatar
dengyihao 已提交
313
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
314
#ifdef USE_ROCKSDB
5
54liuyao 已提交
315
  return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
316
#else
dengyihao's avatar
dengyihao 已提交
317 318
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
319
#endif
5
54liuyao 已提交
320
}
5
54liuyao 已提交
321 322 323 324 325 326 327 328 329 330 331

bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
  #ifdef USE_ROCKSDB
  return hasRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey));
#else
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
#endif
}

int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
L
liuyao 已提交
332 333 334
  int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
  releaseRowBuffPos(pos);
  return code;
5
54liuyao 已提交
335 336
}

337
// todo refactor
dengyihao's avatar
dengyihao 已提交
338
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
339
#ifdef USE_ROCKSDB
5
54liuyao 已提交
340
  return deleteRowBuff(pState->pFileState, key, sizeof(SWinKey));
dengyihao's avatar
dengyihao 已提交
341
#else
342
  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
343 344 345 346 347 348 349 350 351 352
  return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
#endif
}

// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
  return streamStateFillPut_rocksdb(pState, key, value, vLen);
#else
  return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
353
#endif
354 355
}

5
54liuyao 已提交
356 357
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
358 359 360
#ifdef USE_ROCKSDB
  return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
#else
5
54liuyao 已提交
361
  return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
362
#endif
5
54liuyao 已提交
363 364
}

365
// todo refactor
dengyihao's avatar
dengyihao 已提交
366
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
367
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
368
  return streamStateFillDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
369
#else
dengyihao's avatar
dengyihao 已提交
370
  return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
371
#endif
372 373
}

374
int32_t streamStateClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
375
#ifdef USE_ROCKSDB
5
54liuyao 已提交
376
  streamFileStateClear(pState->pFileState);
dengyihao's avatar
dengyihao 已提交
377 378
  return streamStateClear_rocksdb(pState);
#else
379 380 381 382
  SWinKey key = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &key, NULL, 0);
  while (1) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
dengyihao's avatar
dengyihao 已提交
383 384
    SWinKey delKey = {0};
    int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
5
54liuyao 已提交
385
    streamStateFreeCur(pCur);
386 387 388 389 390 391 392
    if (code == 0) {
      streamStateDel(pState, &delKey);
    } else {
      break;
    }
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
393
#endif
394 395 396 397
}

void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }

398
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
399
#ifdef USE_ROCKSDB
5
54liuyao 已提交
400
  return streamStateGet(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
401
#else
402 403 404 405 406
  // todo refactor
  int32_t size = *pVLen;
  if (streamStateGet(pState, key, pVal, pVLen) == 0) {
    return 0;
  }
5
54liuyao 已提交
407 408 409
  *pVal = tdbRealloc(NULL, size);
  memset(*pVal, 0, size);
  return 0;
dengyihao's avatar
dengyihao 已提交
410
#endif
411 412 413 414
}

int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
415
  qDebug("streamStateReleaseBuf");
5
54liuyao 已提交
416 417 418
  if (!pVal) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
419 420 421
#ifdef USE_ROCKSDB
  taosMemoryFree(pVal);
#else
422
  streamFreeVal(pVal);
dengyihao's avatar
dengyihao 已提交
423
#endif
424 425 426
  return 0;
}

5
54liuyao 已提交
427
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
428 429 430
#ifdef USE_ROCKSDB
  return streamStateFillGetCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
431 432
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
433
  tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
5
54liuyao 已提交
434

5
54liuyao 已提交
435
  int32_t c = 0;
5
54liuyao 已提交
436 437
  tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
  if (c != 0) {
5
54liuyao 已提交
438
    streamStateFreeCur(pCur);
5
54liuyao 已提交
439 440 441
    return NULL;
  }
  return pCur;
dengyihao's avatar
dengyihao 已提交
442
#endif
5
54liuyao 已提交
443 444 445
}

SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
446 447 448
#ifdef USE_ROCKSDB
  return streamStateGetAndCheckCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
449 450 451 452 453 454
  SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
    if (code == 0) {
      return pCur;
    }
5
54liuyao 已提交
455
    streamStateFreeCur(pCur);
5
54liuyao 已提交
456 457
  }
  return NULL;
dengyihao's avatar
dengyihao 已提交
458
#endif
5
54liuyao 已提交
459 460
}

461
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
462 463 464
#ifdef USE_ROCKSDB
  return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
465 466 467 468
  if (!pCur) {
    return -1;
  }
  const SStateKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
469
  int32_t kLen;
470 471 472 473 474 475 476 477
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
478
#endif
479 480 481
}

int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
482 483 484
#ifdef USE_ROCKSDB
  return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
485 486 487
  if (!pCur) {
    return -1;
  }
488
  const SWinKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
489
  int32_t kLen;
490 491 492 493 494
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  *pKey = *pKTmp;
  return 0;
dengyihao's avatar
dengyihao 已提交
495
#endif
496 497
}

5
54liuyao 已提交
498
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
499 500 501
#ifdef USE_ROCKSDB
  return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
502 503 504
  if (!pCur) {
    return -1;
  }
5
54liuyao 已提交
505
  uint64_t groupId = pKey->groupId;
dengyihao's avatar
dengyihao 已提交
506
  int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
5
54liuyao 已提交
507 508 509 510 511 512
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
  }
  return -1;
dengyihao's avatar
dengyihao 已提交
513
#endif
5
54liuyao 已提交
514 515
}

516
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
517 518 519
#ifdef USE_ROCKSDB
  return streamStateGetFirst_rocksdb(pState, key);
#else
520 521 522 523
  // todo refactor
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
dengyihao's avatar
dengyihao 已提交
524
  int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
M
Minglei Jin 已提交
525
  streamStateFreeCur(pCur);
526 527
  streamStateDel(pState, &tmp);
  return code;
dengyihao's avatar
dengyihao 已提交
528
#endif
529 530
}

531
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
532 533 534 535
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_first(pCur->iter);
  return 0;
#else
536
  return tdbTbcMoveToFirst(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
537
#endif
538 539 540
}

int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
541 542 543 544
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_last(pCur->iter);
  return 0;
#else
545
  return tdbTbcMoveToLast(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
546
#endif
547 548
}

549
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
550 551 552
#ifdef USE_ROCKSDB
  return streamStateSeekKeyNext_rocksdb(pState, key);
#else
553 554 555 556 557
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
558
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
559
    streamStateFreeCur(pCur);
560 561 562 563
    return NULL;
  }

  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
564
  int32_t c = 0;
565
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
5
54liuyao 已提交
566
    streamStateFreeCur(pCur);
567 568 569 570 571
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
572
    streamStateFreeCur(pCur);
573 574 575 576
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
577
#endif
578 579
}

5
54liuyao 已提交
580
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
581 582 583
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyNext_rocksdb(pState, key);
#else
584
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
5
54liuyao 已提交
585
  if (!pCur) {
586 587
    return NULL;
  }
5
54liuyao 已提交
588
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
589
    streamStateFreeCur(pCur);
5
54liuyao 已提交
590 591
    return NULL;
  }
592

5
54liuyao 已提交
593
  int32_t c = 0;
594
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
595
    streamStateFreeCur(pCur);
596 597 598 599 600
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
601
    streamStateFreeCur(pCur);
602 603 604 605
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
606
#endif
607 608
}

5
54liuyao 已提交
609
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
610 611 612
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyPrev_rocksdb(pState, key);
#else
613 614 615 616
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
5
54liuyao 已提交
617
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
618
    streamStateFreeCur(pCur);
5
54liuyao 已提交
619 620
    return NULL;
  }
621

5
54liuyao 已提交
622
  int32_t c = 0;
623
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
624
    streamStateFreeCur(pCur);
625 626 627 628 629
    return NULL;
  }
  if (c < 0) return pCur;

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
5
54liuyao 已提交
630
    streamStateFreeCur(pCur);
631 632 633 634
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
635
#endif
636 637 638
}

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
639 640 641
#ifdef USE_ROCKSDB
  return streamStateCurNext_rocksdb(pState, pCur);
#else
5
54liuyao 已提交
642 643 644
  if (!pCur) {
    return -1;
  }
645 646
  //
  return tdbTbcMoveToNext(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
647
#endif
648 649 650
}

int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
651 652 653
#ifdef USE_ROCKSDB
  return streamStateCurPrev_rocksdb(pState, pCur);
#else
654 655 656
  if (!pCur) {
    return -1;
  }
657
  return tdbTbcMoveToPrev(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
658
#endif
659
}
660
void streamStateFreeCur(SStreamStateCur* pCur) {
661 662 663
  if (!pCur) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
664
  qDebug("streamStateFreeCur");
dengyihao's avatar
dengyihao 已提交
665
  rocksdb_iter_destroy(pCur->iter);
dengyihao's avatar
dengyihao 已提交
666
  if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot);
dengyihao's avatar
dengyihao 已提交
667 668
  rocksdb_readoptions_destroy(pCur->readOpt);

dengyihao's avatar
dengyihao 已提交
669
  tdbTbcClose(pCur->pCur);
670 671 672
  taosMemoryFree(pCur);
}

dengyihao's avatar
dengyihao 已提交
673 674 675 676 677 678 679
void streamFreeVal(void* val) {
#ifdef USE_ROCKSDB
  taosMemoryFree(val);
#else
  tdbFree(val);
#endif
}
5
54liuyao 已提交
680 681

int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
682 683 684
#ifdef USE_ROCKSDB
  return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else
5
54liuyao 已提交
685
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
686
  return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
687
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
688
#endif
5
54liuyao 已提交
689 690 691
}

int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
692 693 694 695
#ifdef USE_ROCKSDB
  return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
#else

696
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
dengyihao's avatar
dengyihao 已提交
697 698 699
  SSessionKey resKey = *key;
  void* tmp = NULL;
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
700
  if (code == 0) {
5
54liuyao 已提交
701 702 703 704 705 706 707
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = tdbRealloc(NULL, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
    }
5
54liuyao 已提交
708 709
  }
  streamStateFreeCur(pCur);
710
  return code;
dengyihao's avatar
dengyihao 已提交
711
#endif
5
54liuyao 已提交
712 713 714
}

int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
715 716 717
#ifdef USE_ROCKSDB
  return streamStateSessionDel_rocksdb(pState, key);
#else
5
54liuyao 已提交
718
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
719
  return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
720
#endif
5
54liuyao 已提交
721 722
}

723
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
724 725 726
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
#else
5
54liuyao 已提交
727 728 729 730 731
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
732
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
733 734 735 736 737
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
738
  int32_t c = 0;
5
54liuyao 已提交
739 740 741 742
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
743
  if (c >= 0) return pCur;
5
54liuyao 已提交
744 745 746 747 748 749 750

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
751
#endif
5
54liuyao 已提交
752 753
}

754
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
755 756 757
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key);
#else
758 759 760 761 762
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
763
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
764 765 766 767 768
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
769
  int32_t c = 0;
770 771 772 773 774 775 776 777 778 779 780 781 782
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  if (c <= 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
783
#endif
784 785
}

5
54liuyao 已提交
786
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
787 788 789
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyNext_rocksdb(pState, key);
#else
5
54liuyao 已提交
790 791 792 793 794
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
795
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
796 797 798 799 800
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
801
  int32_t c = 0;
5
54liuyao 已提交
802 803 804 805
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
806
  if (c < 0) return pCur;
5
54liuyao 已提交
807 808 809 810 811 812 813

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
814
#endif
5
54liuyao 已提交
815 816
}

817
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
818 819 820
#ifdef USE_ROCKSDB
  return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
821 822 823
  if (!pCur) {
    return -1;
  }
824
  SStateSessionKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
825
  int32_t kLen;
826
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
5
54liuyao 已提交
827 828 829 830 831 832 833 834 835 836
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
837
#endif
5
54liuyao 已提交
838 839 840
}

int32_t streamStateSessionClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
841 842 843 844
#ifdef USE_ROCKSDB
  return streamStateSessionClear_rocksdb(pState);
#else
  SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
845
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
5
54liuyao 已提交
846 847
  while (1) {
    SSessionKey delKey = {0};
dengyihao's avatar
dengyihao 已提交
848 849 850
    void* buf = NULL;
    int32_t size = 0;
    int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
5
54liuyao 已提交
851
    if (code == 0 && size > 0) {
5
54liuyao 已提交
852 853 854 855 856 857 858 859 860
      memset(buf, 0, size);
      streamStateSessionPut(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return 0;
dengyihao's avatar
dengyihao 已提交
861
#endif
5
54liuyao 已提交
862 863
}

864
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
dengyihao's avatar
dengyihao 已提交
865 866 867
#ifdef USE_ROCKSDB
  return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey);
#else
868 869 870 871 872
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
873
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
874 875
    streamStateFreeCur(pCur);
    return -1;
5
54liuyao 已提交
876 877
  }

878
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
879
  int32_t c = 0;
880 881 882 883 884 885
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return -1;
  }

  SSessionKey resKey = *key;
dengyihao's avatar
dengyihao 已提交
886
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
908 909
    }
  }
910

911
  streamStateFreeCur(pCur);
912
  return -1;
dengyihao's avatar
dengyihao 已提交
913
#endif
914 915
}

916 917
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                        int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
918 919 920
#ifdef USE_ROCKSDB
  return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen);
#else
5
54liuyao 已提交
921
  // todo refactor
dengyihao's avatar
dengyihao 已提交
922
  int32_t res = 0;
923 924 925 926 927
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;
dengyihao's avatar
dengyihao 已提交
928
  void* tmp = tdbRealloc(NULL, valSize);
929 930 931 932 933
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
934
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
935 936 937 938 939 940 941 942 943
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
    streamStateCurNext(pState, pCur);
  } else {
    *key = originKey;
5
54liuyao 已提交
944
    streamStateFreeCur(pCur);
945
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
946
  }
947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963

  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
5
54liuyao 已提交
964
  streamStateFreeCur(pCur);
965
  return res;
dengyihao's avatar
dengyihao 已提交
966 967

#endif
5
54liuyao 已提交
968 969 970 971 972
}

int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
973 974 975 976 977

#ifdef USE_ROCKSDB
  return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else
  int32_t res = 0;
5
54liuyao 已提交
978
  SSessionKey tmpKey = *key;
dengyihao's avatar
dengyihao 已提交
979 980
  int32_t valSize = *pVLen;
  void* tmp = tdbRealloc(NULL, valSize);
5
54liuyao 已提交
981 982 983 984
  if (!tmp) {
    return -1;
  }

985
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
986
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
987 988 989
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
990
      streamStateSessionDel(pState, key);
991 992
      goto _end;
    }
5
54liuyao 已提交
993 994 995 996

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
997
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
998 999
      goto _end;
    }
5
54liuyao 已提交
1000 1001 1002 1003 1004 1005

    streamStateCurNext(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
1006 1007
  }

1008
  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1009
  if (code == 0) {
5
54liuyao 已提交
1010 1011 1012
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1013
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
dengyihao's avatar
dengyihao 已提交
1027
#endif
5
54liuyao 已提交
1028
}
5
54liuyao 已提交
1029

1030
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
dengyihao's avatar
dengyihao 已提交
1031
  qWarn("try to write to cf parname");
dengyihao's avatar
dengyihao 已提交
1032
#ifdef USE_ROCKSDB
L
liuyao 已提交
1033 1034 1035 1036 1037 1038 1039 1040
  if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
    if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
      streamStatePutParName_rocksdb(pState, groupId, tbname);
    }
    return TSDB_CODE_SUCCESS;
  }
  tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1041
#else
L
Liu Jicong 已提交
1042 1043
  return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
1044
#endif
1045 1046 1047
}

int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
dengyihao's avatar
dengyihao 已提交
1048
#ifdef USE_ROCKSDB
L
liuyao 已提交
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058
  void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t));
  if (!pStr) {
    if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
      return streamStateGetParName_rocksdb(pState, groupId, pVal);
    }
    return TSDB_CODE_FAILED;
  }
  *pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
  memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN);
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1059
#else
1060
  int32_t len;
5
54liuyao 已提交
1061
  return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
dengyihao's avatar
dengyihao 已提交
1062
#endif
5
54liuyao 已提交
1063 1064 1065
}

void streamStateDestroy(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
1066
#ifdef USE_ROCKSDB
5
54liuyao 已提交
1067
  streamFileStateDestroy(pState->pFileState);
dengyihao's avatar
dengyihao 已提交
1068
  streamStateDestroy_rocksdb(pState);
L
fix bug  
liuyao 已提交
1069
  taosMemoryFreeClear(pState->parNameMap);
dengyihao's avatar
dengyihao 已提交
1070 1071
  // do nothong
#endif
5
54liuyao 已提交
1072 1073
  taosMemoryFreeClear(pState->pTdbState);
  taosMemoryFreeClear(pState);
1074 1075
}

L
liuyao 已提交
1076 1077 1078 1079 1080 1081 1082 1083
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
#ifdef USE_ROCKSDB
  return deleteExpiredCheckPoint(pState->pFileState, mark);
#else
  return 0;
#endif
}

5
54liuyao 已提交
1084 1085 1086 1087 1088 1089 1090
#if 0
char* streamStateSessionDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
1091
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
1092 1093 1094 1095 1096 1097
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SSessionKey key = {0};
1098 1099 1100
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
5
54liuyao 已提交
1101
  if (code != 0) {
1102
    streamStateFreeCur(pCur);
5
54liuyao 已提交
1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
  len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SSessionKey){0};
    code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
1117
      streamStateFreeCur(pCur);
5
54liuyao 已提交
1118 1119 1120 1121 1122 1123
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
    len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
1124
  streamStateFreeCur(pCur);
5
54liuyao 已提交
1125 1126
  return dumpBuf;
}
5
54liuyao 已提交
1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169

char* streamStateIntervalDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SWinKey key = {0};
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateGetKVByCur(pCur, &key, (const void **)&buf, &bufSize);
  if (code != 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
  // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SWinKey){0};
    code = streamStateGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
      streamStateFreeCur(pCur);
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
    // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
  streamStateFreeCur(pCur);
  return dumpBuf;
}
5
54liuyao 已提交
1170
#endif