streamState.c 31.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "streamState.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include <bits/stdint-uintn.h>
#include <string.h>
19
#include "executor.h"
dengyihao's avatar
dengyihao 已提交
20
#include "osMemory.h"
dengyihao's avatar
dengyihao 已提交
21
#include "rocksdb/c.h"
dengyihao's avatar
dengyihao 已提交
22
#include "streamBackendRocksdb.h"
23
#include "streamInc.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tcoding.h"
25
#include "tcommon.h"
26
#include "tcompare.h"
27 28
#include "ttimer.h"

dengyihao's avatar
dengyihao 已提交
29
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
5
54liuyao 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
45
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  if (pWin1->win.ekey > pWin2->win.ekey) {
    return 1;
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
67
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
5
54liuyao 已提交
68 69 70 71 72 73 74 75 76
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

77
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
5
54liuyao 已提交
78 79
}

dengyihao's avatar
dengyihao 已提交
80
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
  SStateKey* pWin1 = (SStateKey*)pKey1;
  SStateKey* pWin2 = (SStateKey*)pKey2;

  if (pWin1->opNum > pWin2->opNum) {
    return 1;
  } else if (pWin1->opNum < pWin2->opNum) {
    return -1;
  }

  if (pWin1->key.ts > pWin2->key.ts) {
    return 1;
  } else if (pWin1->key.ts < pWin2->key.ts) {
    return -1;
  }

  if (pWin1->key.groupId > pWin2->key.groupId) {
    return 1;
  } else if (pWin1->key.groupId < pWin2->key.groupId) {
    return -1;
  }

  return 0;
}

105
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
dengyihao's avatar
dengyihao 已提交
106
  qWarn("open stream state, %s", path);
107 108 109 110 111
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
  if (pState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
5
54liuyao 已提交
112 113 114 115 116 117
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
  if (pState->pTdbState == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    streamStateDestroy(pState);
    return NULL;
  }
L
Liu Jicong 已提交
118

119
  char statePath[1024];
L
Liu Jicong 已提交
120 121 122
  if (!specPath) {
    sprintf(statePath, "%s/%d", path, pTask->taskId);
  } else {
123 124
    memset(statePath, 0, 1024);
    tstrncpy(statePath, path, 1024);
L
Liu Jicong 已提交
125
  }
dengyihao's avatar
dengyihao 已提交
126 127 128 129 130 131 132 133 134 135 136 137
#ifdef USE_ROCKSDB
  qWarn("open stream state1");
  int code = streamInitBackend(pState, statePath);
  if (code == -1) {
    taosMemoryFree(pState);
    pState = NULL;
  }
  qWarn("open stream state2, %s", statePath);
  pState->pTdbState->pOwner = pTask;
  return pState;

#else
L
add cfg  
Liu Jicong 已提交
138

L
Liu Jicong 已提交
139
  char cfgPath[1030];
L
add cfg  
Liu Jicong 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152
  sprintf(cfgPath, "%s/cfg", statePath);

  char cfg[1024];
  memset(cfg, 0, 1024);
  TdFilePtr pCfgFile = taosOpenFile(cfgPath, TD_FILE_READ);
  if (pCfgFile != NULL) {
    int64_t size;
    taosFStatFile(pCfgFile, &size, NULL);
    taosReadFile(pCfgFile, cfg, size);
    sscanf(cfg, "%d\n%d\n", &szPage, &pages);
  } else {
    taosMulModeMkDir(statePath, 0755);
    pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE);
L
Liu Jicong 已提交
153 154
    szPage = szPage < 0 ? 4096 : szPage;
    pages = pages < 0 ? 256 : pages;
L
add cfg  
Liu Jicong 已提交
155 156 157 158 159
    sprintf(cfg, "%d\n%d\n", szPage, pages);
    taosWriteFile(pCfgFile, cfg, strlen(cfg));
  }
  taosCloseFile(&pCfgFile);

160
  if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) {
161 162 163 164
    goto _err;
  }

  // open state storage backend
5
54liuyao 已提交
165 166
  if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
                0) < 0) {
167 168 169
    goto _err;
  }

5
54liuyao 已提交
170
  // todo refactor
5
54liuyao 已提交
171 172
  if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFillStateDb, 0) < 0) {
5
54liuyao 已提交
173 174 175
    goto _err;
  }

5
54liuyao 已提交
176 177
  if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pSessionStateDb, 0) < 0) {
5
54liuyao 已提交
178 179 180
    goto _err;
  }

5
54liuyao 已提交
181 182
  if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
                &pState->pTdbState->pFuncStateDb, 0) < 0) {
183 184 185
    goto _err;
  }

5
54liuyao 已提交
186 187
  if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
                &pState->pTdbState->pParNameDb, 0) < 0) {
188 189 190
    goto _err;
  }

L
Liu Jicong 已提交
191 192 193 194 195
  if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) <
      0) {
    goto _err;
  }

196
  if (streamStateBegin(pState) < 0) {
197 198 199
    goto _err;
  }

5
54liuyao 已提交
200
  pState->pTdbState->pOwner = pTask;
201 202 203 204

  return pState;

_err:
5
54liuyao 已提交
205 206 207 208 209
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
210
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
211 212
  tdbClose(pState->pTdbState->db);
  streamStateDestroy(pState);
213
  return NULL;
dengyihao's avatar
dengyihao 已提交
214
#endif
215 216 217
}

void streamStateClose(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
218
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
219
  streamCleanBackend(pState);
dengyihao's avatar
dengyihao 已提交
220
#else
221 222
  tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
  tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
5
54liuyao 已提交
223 224 225 226 227
  tdbTbClose(pState->pTdbState->pStateDb);
  tdbTbClose(pState->pTdbState->pFuncStateDb);
  tdbTbClose(pState->pTdbState->pFillStateDb);
  tdbTbClose(pState->pTdbState->pSessionStateDb);
  tdbTbClose(pState->pTdbState->pParNameDb);
L
Liu Jicong 已提交
228
  tdbTbClose(pState->pTdbState->pParTagDb);
5
54liuyao 已提交
229
  tdbClose(pState->pTdbState->db);
dengyihao's avatar
dengyihao 已提交
230
#endif
5
54liuyao 已提交
231
  streamStateDestroy(pState);
232 233 234
}

int32_t streamStateBegin(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
235 236 237
#ifdef USE_ROCKSDB
  return 0;
#else
238
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
239
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
240
    tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
241 242 243
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
244
#endif
245 246 247
}

int32_t streamStateCommit(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
248 249 250
#ifdef USE_ROCKSDB
  return 0;
#else
251
  if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
M
Minglei Jin 已提交
252 253
    return -1;
  }
254
  if (tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
255 256
    return -1;
  }
257

258
  if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
259
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
260 261 262
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
263
#endif
264 265
}

266
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
267 268 269
#ifdef USE_ROCKSDB
  return streamStateFuncPut_rocksdb(pState, key, value, vLen);
#else
270
  return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
271
#endif
272 273
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
274
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
275
  return streamStateFuncGet_rocksdb(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
276
#else
5
54liuyao 已提交
277
  return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
278
#endif
279 280 281
}

int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
dengyihao's avatar
dengyihao 已提交
282 283 284
#ifdef USE_ROCKSDB
  return streamStateFuncDel_rocksdb(pState, key);
#else
285
  return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
286
#endif
287 288
}

289
// todo refactor
290
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
291
#ifdef USE_ROCKSDB
5
54liuyao 已提交
292 293
  return 0;
  // return streamStatePut_rocksdb(pState, key, value, vLen);
dengyihao's avatar
dengyihao 已提交
294
#else
295
  SStateKey sKey = {.key = *key, .opNum = pState->number};
296
  return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
297
#endif
298
}
5
54liuyao 已提交
299 300

// todo refactor
dengyihao's avatar
dengyihao 已提交
301
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
302
#ifdef USE_ROCKSDB
5
54liuyao 已提交
303 304
  return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen);
  // return streamStateGet_rocksdb(pState, key, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
305
#else
dengyihao's avatar
dengyihao 已提交
306 307
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
308
#endif
5
54liuyao 已提交
309
}
310
// todo refactor
dengyihao's avatar
dengyihao 已提交
311
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
312
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
313
  return streamStateDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
314
#else
315
  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
316 317 318 319 320 321 322 323 324 325
  return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
#endif
}

// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
  return streamStateFillPut_rocksdb(pState, key, value, vLen);
#else
  return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
326
#endif
327 328
}

5
54liuyao 已提交
329 330
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
331 332 333
#ifdef USE_ROCKSDB
  return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
#else
5
54liuyao 已提交
334
  return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
335
#endif
5
54liuyao 已提交
336 337
}

338
// todo refactor
dengyihao's avatar
dengyihao 已提交
339
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
340
#ifdef USE_ROCKSDB
dengyihao's avatar
dengyihao 已提交
341
  return streamStateFillDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
342
#else
dengyihao's avatar
dengyihao 已提交
343
  return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
344
#endif
345 346
}

347
int32_t streamStateClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
348 349 350
#ifdef USE_ROCKSDB
  return streamStateClear_rocksdb(pState);
#else
351 352 353 354
  SWinKey key = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &key, NULL, 0);
  while (1) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
dengyihao's avatar
dengyihao 已提交
355 356
    SWinKey delKey = {0};
    int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
5
54liuyao 已提交
357
    streamStateFreeCur(pCur);
358 359 360 361 362 363 364
    if (code == 0) {
      streamStateDel(pState, &delKey);
    } else {
      break;
    }
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
365
#endif
366 367 368 369
}

void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }

370
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
371 372 373
#ifdef USE_ROCKSDB
  return streamStateAddIfNotExist_rocksdb(pState, key, pVal, pVLen);
#else
374 375 376 377 378
  // todo refactor
  int32_t size = *pVLen;
  if (streamStateGet(pState, key, pVal, pVLen) == 0) {
    return 0;
  }
5
54liuyao 已提交
379 380 381
  *pVal = tdbRealloc(NULL, size);
  memset(*pVal, 0, size);
  return 0;
dengyihao's avatar
dengyihao 已提交
382
#endif
383 384 385 386
}

int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
387
  qDebug("streamStateReleaseBuf");
5
54liuyao 已提交
388 389 390
  if (!pVal) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
391 392 393
#ifdef USE_ROCKSDB
  taosMemoryFree(pVal);
#else
394
  streamFreeVal(pVal);
dengyihao's avatar
dengyihao 已提交
395
#endif
396 397 398
  return 0;
}

399
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
400 401 402
#ifdef USE_ROCKSDB
  return streamStateGetCur_rocksdb(pState, key);
#else
403 404
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
405
  tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL);
406

dengyihao's avatar
dengyihao 已提交
407
  int32_t c = 0;
408
  SStateKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
409
  tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c);
410
  if (c != 0) {
5
54liuyao 已提交
411
    streamStateFreeCur(pCur);
412 413
    return NULL;
  }
414
  pCur->number = pState->number;
415
  return pCur;
dengyihao's avatar
dengyihao 已提交
416
#endif
417 418
}

5
54liuyao 已提交
419
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
420 421 422
#ifdef USE_ROCKSDB
  return streamStateFillGetCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
423 424
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) return NULL;
5
54liuyao 已提交
425
  tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
5
54liuyao 已提交
426

5
54liuyao 已提交
427
  int32_t c = 0;
5
54liuyao 已提交
428 429
  tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
  if (c != 0) {
5
54liuyao 已提交
430
    streamStateFreeCur(pCur);
5
54liuyao 已提交
431 432 433
    return NULL;
  }
  return pCur;
dengyihao's avatar
dengyihao 已提交
434
#endif
5
54liuyao 已提交
435 436 437
}

SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
438 439 440
#ifdef USE_ROCKSDB
  return streamStateGetAndCheckCur_rocksdb(pState, key);
#else
5
54liuyao 已提交
441 442 443 444 445 446
  SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
    if (code == 0) {
      return pCur;
    }
5
54liuyao 已提交
447
    streamStateFreeCur(pCur);
5
54liuyao 已提交
448 449
  }
  return NULL;
dengyihao's avatar
dengyihao 已提交
450
#endif
5
54liuyao 已提交
451 452
}

453
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
454 455 456
#ifdef USE_ROCKSDB
  return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
457 458 459 460
  if (!pCur) {
    return -1;
  }
  const SStateKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
461
  int32_t kLen;
462 463 464 465 466 467 468 469
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
470
#endif
471 472 473
}

int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
474 475 476
#ifdef USE_ROCKSDB
  return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
477 478 479
  if (!pCur) {
    return -1;
  }
480
  const SWinKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
481
  int32_t kLen;
482 483 484 485 486
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
    return -1;
  }
  *pKey = *pKTmp;
  return 0;
dengyihao's avatar
dengyihao 已提交
487
#endif
488 489
}

5
54liuyao 已提交
490
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
491 492 493
#ifdef USE_ROCKSDB
  return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
494 495 496
  if (!pCur) {
    return -1;
  }
5
54liuyao 已提交
497
  uint64_t groupId = pKey->groupId;
dengyihao's avatar
dengyihao 已提交
498
  int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
5
54liuyao 已提交
499 500 501 502 503 504
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
  }
  return -1;
dengyihao's avatar
dengyihao 已提交
505
#endif
5
54liuyao 已提交
506 507
}

508
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
509 510 511
#ifdef USE_ROCKSDB
  return streamStateGetFirst_rocksdb(pState, key);
#else
512 513 514 515
  // todo refactor
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
dengyihao's avatar
dengyihao 已提交
516
  int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
M
Minglei Jin 已提交
517
  streamStateFreeCur(pCur);
518 519
  streamStateDel(pState, &tmp);
  return code;
dengyihao's avatar
dengyihao 已提交
520
#endif
521 522
}

523
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
524 525 526 527
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_first(pCur->iter);
  return 0;
#else
528
  return tdbTbcMoveToFirst(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
529
#endif
530 531 532
}

int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
533 534 535 536
#ifdef USE_ROCKSDB
  rocksdb_iter_seek_to_last(pCur->iter);
  return 0;
#else
537
  return tdbTbcMoveToLast(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
538
#endif
539 540
}

541
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
542 543 544
#ifdef USE_ROCKSDB
  return streamStateSeekKeyNext_rocksdb(pState, key);
#else
545 546 547 548 549
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
550
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
551
    streamStateFreeCur(pCur);
552 553 554 555
    return NULL;
  }

  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
556
  int32_t c = 0;
557
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
5
54liuyao 已提交
558
    streamStateFreeCur(pCur);
559 560 561 562 563
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
564
    streamStateFreeCur(pCur);
565 566 567 568
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
569
#endif
570 571
}

5
54liuyao 已提交
572
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
573 574 575
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyNext_rocksdb(pState, key);
#else
576
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
5
54liuyao 已提交
577
  if (!pCur) {
578 579
    return NULL;
  }
5
54liuyao 已提交
580
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
581
    streamStateFreeCur(pCur);
5
54liuyao 已提交
582 583
    return NULL;
  }
584

5
54liuyao 已提交
585
  int32_t c = 0;
586
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
587
    streamStateFreeCur(pCur);
588 589 590 591 592
    return NULL;
  }
  if (c > 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
5
54liuyao 已提交
593
    streamStateFreeCur(pCur);
594 595 596 597
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
598
#endif
599 600
}

5
54liuyao 已提交
601
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
602 603 604
#ifdef USE_ROCKSDB
  return streamStateFillSeekKeyPrev_rocksdb(pState, key);
#else
605 606 607 608
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
5
54liuyao 已提交
609
  if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
610
    streamStateFreeCur(pCur);
5
54liuyao 已提交
611 612
    return NULL;
  }
613

5
54liuyao 已提交
614
  int32_t c = 0;
615
  if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
5
54liuyao 已提交
616
    streamStateFreeCur(pCur);
617 618 619 620 621
    return NULL;
  }
  if (c < 0) return pCur;

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
5
54liuyao 已提交
622
    streamStateFreeCur(pCur);
623 624 625 626
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
627
#endif
628 629 630
}

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
631 632 633
#ifdef USE_ROCKSDB
  return streamStateCurNext_rocksdb(pState, pCur);
#else
5
54liuyao 已提交
634 635 636
  if (!pCur) {
    return -1;
  }
637 638
  //
  return tdbTbcMoveToNext(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
639
#endif
640 641 642
}

int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
dengyihao's avatar
dengyihao 已提交
643 644 645
#ifdef USE_ROCKSDB
  return streamStateCurPrev_rocksdb(pState, pCur);
#else
646 647 648
  if (!pCur) {
    return -1;
  }
649
  return tdbTbcMoveToPrev(pCur->pCur);
dengyihao's avatar
dengyihao 已提交
650
#endif
651
}
652
void streamStateFreeCur(SStreamStateCur* pCur) {
653 654 655
  if (!pCur) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
656
  qDebug("streamStateFreeCur");
dengyihao's avatar
dengyihao 已提交
657
  rocksdb_iter_destroy(pCur->iter);
dengyihao's avatar
dengyihao 已提交
658
  if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot);
dengyihao's avatar
dengyihao 已提交
659 660
  rocksdb_readoptions_destroy(pCur->readOpt);

dengyihao's avatar
dengyihao 已提交
661
  tdbTbcClose(pCur->pCur);
662 663 664
  taosMemoryFree(pCur);
}

dengyihao's avatar
dengyihao 已提交
665 666 667 668 669 670 671
void streamFreeVal(void* val) {
#ifdef USE_ROCKSDB
  taosMemoryFree(val);
#else
  tdbFree(val);
#endif
}
5
54liuyao 已提交
672 673

int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
dengyihao's avatar
dengyihao 已提交
674 675 676
#ifdef USE_ROCKSDB
  return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else
5
54liuyao 已提交
677
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
5
54liuyao 已提交
678
  return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
679
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
680
#endif
5
54liuyao 已提交
681 682 683
}

int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
684 685 686 687
#ifdef USE_ROCKSDB
  return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
#else

688
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
dengyihao's avatar
dengyihao 已提交
689 690 691
  SSessionKey resKey = *key;
  void* tmp = NULL;
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
692
  if (code == 0) {
5
54liuyao 已提交
693 694 695 696 697 698 699
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = tdbRealloc(NULL, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
    }
5
54liuyao 已提交
700 701
  }
  streamStateFreeCur(pCur);
702
  return code;
dengyihao's avatar
dengyihao 已提交
703
#endif
5
54liuyao 已提交
704 705 706
}

int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
707 708 709
#ifdef USE_ROCKSDB
  return streamStateSessionDel_rocksdb(pState, key);
#else
5
54liuyao 已提交
710
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
711
  return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
712
#endif
5
54liuyao 已提交
713 714
}

715
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
716 717 718
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
#else
5
54liuyao 已提交
719 720 721 722 723
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
724
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
725 726 727 728 729
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
730
  int32_t c = 0;
5
54liuyao 已提交
731 732 733 734
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
735
  if (c >= 0) return pCur;
5
54liuyao 已提交
736 737 738 739 740 741 742

  if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
743
#endif
5
54liuyao 已提交
744 745
}

746
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
747 748 749
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key);
#else
750 751 752 753 754
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
755
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
756 757 758 759 760
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
761
  int32_t c = 0;
762 763 764 765 766 767 768 769 770 771 772 773 774
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  if (c <= 0) return pCur;

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
775
#endif
776 777
}

5
54liuyao 已提交
778
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
779 780 781
#ifdef USE_ROCKSDB
  return streamStateSessionSeekKeyNext_rocksdb(pState, key);
#else
5
54liuyao 已提交
782 783 784 785 786
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
787
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
788 789 790 791 792
    streamStateFreeCur(pCur);
    return NULL;
  }

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
793
  int32_t c = 0;
5
54liuyao 已提交
794 795 796 797
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
798
  if (c < 0) return pCur;
5
54liuyao 已提交
799 800 801 802 803 804 805

  if (tdbTbcMoveToNext(pCur->pCur) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  return pCur;
dengyihao's avatar
dengyihao 已提交
806
#endif
5
54liuyao 已提交
807 808
}

809
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
810 811 812
#ifdef USE_ROCKSDB
  return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
5
54liuyao 已提交
813 814 815
  if (!pCur) {
    return -1;
  }
816
  SStateSessionKey* pKTmp = NULL;
dengyihao's avatar
dengyihao 已提交
817
  int32_t kLen;
818
  if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
5
54liuyao 已提交
819 820 821 822 823 824 825 826 827 828
    return -1;
  }
  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
dengyihao's avatar
dengyihao 已提交
829
#endif
5
54liuyao 已提交
830 831 832
}

int32_t streamStateSessionClear(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
833 834 835 836
#ifdef USE_ROCKSDB
  return streamStateSessionClear_rocksdb(pState);
#else
  SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
837
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
5
54liuyao 已提交
838 839
  while (1) {
    SSessionKey delKey = {0};
dengyihao's avatar
dengyihao 已提交
840 841 842
    void* buf = NULL;
    int32_t size = 0;
    int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
5
54liuyao 已提交
843
    if (code == 0 && size > 0) {
5
54liuyao 已提交
844 845 846 847 848 849 850 851 852
      memset(buf, 0, size);
      streamStateSessionPut(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return 0;
dengyihao's avatar
dengyihao 已提交
853
#endif
5
54liuyao 已提交
854 855
}

856
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
dengyihao's avatar
dengyihao 已提交
857 858 859
#ifdef USE_ROCKSDB
  return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey);
#else
860 861 862 863 864
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
865
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
866 867
    streamStateFreeCur(pCur);
    return -1;
5
54liuyao 已提交
868 869
  }

870
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
871
  int32_t c = 0;
872 873 874 875 876 877
  if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
    streamStateFreeCur(pCur);
    return -1;
  }

  SSessionKey resKey = *key;
dengyihao's avatar
dengyihao 已提交
878
  int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
900 901
    }
  }
902

903
  streamStateFreeCur(pCur);
904
  return -1;
dengyihao's avatar
dengyihao 已提交
905
#endif
906 907
}

908 909
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                        int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
910 911 912
#ifdef USE_ROCKSDB
  return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen);
#else
5
54liuyao 已提交
913
  // todo refactor
dengyihao's avatar
dengyihao 已提交
914
  int32_t res = 0;
915 916 917 918 919
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;
dengyihao's avatar
dengyihao 已提交
920
  void* tmp = tdbRealloc(NULL, valSize);
921 922 923 924 925
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
926
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
927 928 929 930 931 932 933 934 935
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
    streamStateCurNext(pState, pCur);
  } else {
    *key = originKey;
5
54liuyao 已提交
936
    streamStateFreeCur(pCur);
937
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
938
  }
939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955

  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
5
54liuyao 已提交
956
  streamStateFreeCur(pCur);
957
  return res;
dengyihao's avatar
dengyihao 已提交
958 959

#endif
5
54liuyao 已提交
960 961 962 963 964
}

int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
  // todo refactor
dengyihao's avatar
dengyihao 已提交
965 966 967 968 969

#ifdef USE_ROCKSDB
  return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else
  int32_t res = 0;
5
54liuyao 已提交
970
  SSessionKey tmpKey = *key;
dengyihao's avatar
dengyihao 已提交
971 972
  int32_t valSize = *pVLen;
  void* tmp = tdbRealloc(NULL, valSize);
5
54liuyao 已提交
973 974 975 976
  if (!tmp) {
    return -1;
  }

977
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
dengyihao's avatar
dengyihao 已提交
978
  int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
979 980 981
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
982
      streamStateSessionDel(pState, key);
983 984
      goto _end;
    }
5
54liuyao 已提交
985 986 987 988

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
989
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
990 991
      goto _end;
    }
5
54liuyao 已提交
992 993 994 995 996 997

    streamStateCurNext(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext(pState, key);
5
54liuyao 已提交
998 999
  }

1000
  code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
1001
  if (code == 0) {
5
54liuyao 已提交
1002 1003 1004
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
1005
      streamStateSessionDel(pState, key);
5
54liuyao 已提交
1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
dengyihao's avatar
dengyihao 已提交
1019
#endif
5
54liuyao 已提交
1020
}
5
54liuyao 已提交
1021

1022
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
dengyihao's avatar
dengyihao 已提交
1023
  qWarn("try to write to cf parname");
dengyihao's avatar
dengyihao 已提交
1024 1025 1026
#ifdef USE_ROCKSDB
  return streamStatePutParName_rocksdb(pState, groupId, tbname);
#else
L
Liu Jicong 已提交
1027 1028
  return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
                     pState->pTdbState->txn);
dengyihao's avatar
dengyihao 已提交
1029
#endif
1030 1031 1032
}

int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
dengyihao's avatar
dengyihao 已提交
1033 1034 1035
#ifdef USE_ROCKSDB
  return streamStateGetParName_rocksdb(pState, groupId, pVal);
#else
1036
  int32_t len;
5
54liuyao 已提交
1037
  return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
dengyihao's avatar
dengyihao 已提交
1038
#endif
5
54liuyao 已提交
1039 1040 1041
}

void streamStateDestroy(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
1042 1043 1044 1045
#ifdef USE_ROCKSDB
  streamStateDestroy_rocksdb(pState);
  // do nothong
#endif
5
54liuyao 已提交
1046 1047
  taosMemoryFreeClear(pState->pTdbState);
  taosMemoryFreeClear(pState);
1048 1049
}

5
54liuyao 已提交
1050 1051 1052 1053 1054 1055 1056
#if 0
char* streamStateSessionDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
5
54liuyao 已提交
1057
  if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
5
54liuyao 已提交
1058 1059 1060 1061 1062 1063
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SSessionKey key = {0};
1064 1065 1066
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
5
54liuyao 已提交
1067
  if (code != 0) {
1068
    streamStateFreeCur(pCur);
5
54liuyao 已提交
1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
  len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SSessionKey){0};
    code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
1083
      streamStateFreeCur(pCur);
5
54liuyao 已提交
1084 1085 1086 1087 1088 1089
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
    len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
1090
  streamStateFreeCur(pCur);
5
54liuyao 已提交
1091 1092
  return dumpBuf;
}
5
54liuyao 已提交
1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135

char* streamStateIntervalDump(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
  if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }
  tdbTbcMoveToFirst(pCur->pCur);

  SWinKey key = {0};
  void*       buf = NULL;
  int32_t     bufSize = 0;
  int32_t     code = streamStateGetKVByCur(pCur, &key, (const void **)&buf, &bufSize);
  if (code != 0) {
    streamStateFreeCur(pCur);
    return NULL;
  }

  int32_t size = 2048;
  char*   dumpBuf = taosMemoryCalloc(size, 1);
  int64_t len = 0;
  len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
  // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
  len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  while (1) {
    tdbTbcMoveToNext(pCur->pCur);
    key = (SWinKey){0};
    code = streamStateGetKVByCur(pCur, &key, NULL, 0);
    if (code != 0) {
      streamStateFreeCur(pCur);
      return dumpBuf;
    }
    len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
    // len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
    len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
  }
  streamStateFreeCur(pCur);
  return dumpBuf;
}
5
54liuyao 已提交
1136
#endif