streamBackendRocksdb.c 65.4 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
// #include "streamStateRocksdb.h"
dengyihao's avatar
dengyihao 已提交
17
#include "streamBackendRocksdb.h"
dengyihao's avatar
dengyihao 已提交
18
#include "tcommon.h"
dengyihao's avatar
dengyihao 已提交
19

20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
typedef struct SCompactFilteFactory {
  void* status;
} SCompactFilteFactory;

void          destroyCompactFilteFactory(void* arg);
void          destroyCompactFilte(void* arg);
const char*   compactFilteFactoryName(void* arg);
const char*   compactFilteName(void* arg);
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
                           char** newval, size_t* newvlen, unsigned char* value_changed);
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx);

typedef struct {
  void* tableOpt;
} RocksdbCfParam;
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};

typedef int (*EncodeFunc)(void* key, char* buf);
typedef int (*DecodeFunc)(void* key, char* buf);
typedef int (*ToStringFunc)(void* key, char* buf);
typedef const char* (*CompareName)(void* statue);
typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
typedef void (*DestroyFunc)(void* state);
typedef int32_t (*EncodeValueFunc)(void* value, int32_t vlen, int64_t ttl, char** dest);
typedef int32_t (*DecodeValueFunc)(void* value, int32_t vlen, int64_t* ttl, char** dest);

const char* compareDefaultName(void* name);
const char* compareStateName(void* name);
const char* compareWinKeyName(void* name);
const char* compareSessionKeyName(void* name);
const char* compareFuncKeyName(void* name);
const char* compareParKeyName(void* name);
const char* comparePartagKeyName(void* name);

void* streamBackendInit(const char* path) {
  SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle));
  pHandle->list = tdListNew(sizeof(SCfComparator));
  taosThreadMutexInit(&pHandle->mutex, NULL);

  rocksdb_env_t* env = rocksdb_create_default_env();  // rocksdb_envoptions_create();
  rocksdb_env_set_low_priority_background_threads(env, 4);
  rocksdb_env_set_high_priority_background_threads(env, 2);

  rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20);

  rocksdb_options_t* opts = rocksdb_options_create();
  rocksdb_options_set_env(opts, env);
  rocksdb_options_set_create_if_missing(opts, 1);
  rocksdb_options_set_create_missing_column_families(opts, 1);
  rocksdb_options_set_write_buffer_size(opts, 128 << 20);
  rocksdb_options_set_max_total_wal_size(opts, 128 << 20);
  rocksdb_options_set_recycle_log_file_num(opts, 6);
  rocksdb_options_set_max_write_buffer_number(opts, 3);

  pHandle->env = env;
  pHandle->dbOpt = opts;
  pHandle->cache = cache;
  pHandle->filterFactory = rocksdb_compactionfilterfactory_create(
      NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
  rocksdb_options_set_compaction_filter_factory(pHandle->dbOpt, pHandle->filterFactory);

  char* err = NULL;
  pHandle->db = rocksdb_open(opts, path, &err);
  if (err != NULL) {
    qError("failed to open rocksdb, path:%s, reason:%s", path, err);
    taosMemoryFreeClear(err);
dengyihao's avatar
dengyihao 已提交
86
    // goto _EXIT;
87 88 89 90 91 92 93 94 95 96 97 98 99 100
  }

  return (void*)pHandle;
_EXIT:
  rocksdb_options_destroy(opts);
  rocksdb_cache_destroy(cache);
  rocksdb_env_destroy(env);
  taosThreadMutexDestroy(&pHandle->mutex);
  rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
  tdListFree(pHandle->list);
  free(pHandle);
  return NULL;
}
void streamBackendCleanup(void* arg) {
dengyihao's avatar
dengyihao 已提交
101 102 103 104 105 106 107 108 109 110
  SBackendHandle*         pHandle = (SBackendHandle*)arg;
  rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
  char*                   err = NULL;
  rocksdb_flush(pHandle->db, flushOpt, &err);
  if (err != NULL) {
    qError("failed to flush db before streamBackend clean up, reason:%s", err);
    taosMemoryFree(err);
  }
  rocksdb_flushoptions_destroy(flushOpt);

111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
  rocksdb_close(pHandle->db);
  rocksdb_options_destroy(pHandle->dbOpt);
  rocksdb_env_destroy(pHandle->env);
  rocksdb_cache_destroy(pHandle->cache);

  taosThreadMutexDestroy(&pHandle->mutex);
  SListNode* head = tdListPopHead(pHandle->list);
  while (head != NULL) {
    streamStateDestroyCompar(head->data);
    taosMemoryFree(head);
    head = tdListPopHead(pHandle->list);
  }
  // rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
  tdListFree(pHandle->list);

  taosMemoryFree(pHandle);

  return;
}
SListNode* streamBackendAddCompare(void* backend, void* arg) {
  SBackendHandle* pHandle = (SBackendHandle*)backend;
  SListNode*      node = NULL;
  taosThreadMutexLock(&pHandle->mutex);
  node = tdListAdd(pHandle->list, arg);
  taosThreadMutexUnlock(&pHandle->mutex);
  return node;
}
void streamBackendDelCompare(void* backend, void* arg) {
  SBackendHandle* pHandle = (SBackendHandle*)backend;
  SListNode*      node = NULL;
  taosThreadMutexLock(&pHandle->mutex);
  node = tdListPopNode(pHandle->list, arg);
  taosThreadMutexUnlock(&pHandle->mutex);
  if (node) {
    streamStateDestroyCompar(node->data);
    taosMemoryFree(node);
  }
}
void        streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); }
dengyihao's avatar
dengyihao 已提交
150 151 152 153 154 155 156 157 158
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
int         streamGetInit(const char* funcName);

// |key|-----value------|
// |key|ttl|len|userData|

static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
                                                 rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);

dengyihao's avatar
dengyihao 已提交
159
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
dengyihao's avatar
dengyihao 已提交
160 161 162 163 164 165 166 167 168 169 170
  int ret = memcmp(aBuf, bBuf, aLen);
  if (ret == 0) {
    if (aLen < bLen)
      return -1;
    else if (aLen > bLen)
      return 1;
    else
      return 0;
  } else {
    return ret;
  }
dengyihao's avatar
dengyihao 已提交
171
}
dengyihao's avatar
dengyihao 已提交
172 173 174
int streamStateValueIsStale(char* vv) {
  int64_t ts = 0;
  taosDecodeFixedI64(vv, &ts);
dengyihao's avatar
dengyihao 已提交
175
  return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
176
}
dengyihao's avatar
dengyihao 已提交
177
int iterValueIsStale(rocksdb_iterator_t* iter) {
dengyihao's avatar
dengyihao 已提交
178 179 180
  size_t len;
  char*  v = (char*)rocksdb_iter_value(iter, &len);
  return streamStateValueIsStale(v);
dengyihao's avatar
dengyihao 已提交
181
}
dengyihao's avatar
dengyihao 已提交
182 183 184 185 186 187 188 189 190 191 192 193 194 195
int defaultKeyEncode(void* k, char* buf) {
  int len = strlen((char*)k);
  memcpy(buf, (char*)k, len);
  return len;
}
int defaultKeyDecode(void* k, char* buf) {
  int len = strlen(buf);
  memcpy(k, buf, len);
  return len;
}
int defaultKeyToString(void* k, char* buf) {
  // just to debug
  return sprintf(buf, "key: %s", (char*)k);
}
dengyihao's avatar
dengyihao 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
//
//  SStateKey
//  |--groupid--|---ts------|--opNum----|
//  |--uint64_t-|-uint64_t--|--int64_t--|
//
//
//
int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
  SStateKey key1, key2;
  memset(&key1, 0, sizeof(key1));
  memset(&key2, 0, sizeof(key2));

  char* p1 = (char*)aBuf;
  char* p2 = (char*)bBuf;

  p1 = taosDecodeFixedU64(p1, &key1.key.groupId);
  p2 = taosDecodeFixedU64(p2, &key2.key.groupId);

  p1 = taosDecodeFixedI64(p1, &key1.key.ts);
  p2 = taosDecodeFixedI64(p2, &key2.key.ts);

  taosDecodeFixedI64(p1, &key1.opNum);
  taosDecodeFixedI64(p2, &key2.opNum);

  return stateKeyCmpr(&key1, sizeof(key1), &key2, sizeof(key2));
}

int stateKeyEncode(void* k, char* buf) {
  SStateKey* key = k;
  int        len = 0;
  len += taosEncodeFixedU64((void**)&buf, key->key.groupId);
  len += taosEncodeFixedI64((void**)&buf, key->key.ts);
dengyihao's avatar
dengyihao 已提交
228
  len += taosEncodeFixedI64((void**)&buf, key->opNum);
dengyihao's avatar
dengyihao 已提交
229 230 231 232 233 234 235 236 237 238 239 240
  return len;
}
int stateKeyDecode(void* k, char* buf) {
  SStateKey* key = k;
  int        len = 0;
  char*      p = buf;
  p = taosDecodeFixedU64(p, &key->key.groupId);
  p = taosDecodeFixedI64(p, &key->key.ts);
  p = taosDecodeFixedI64(p, &key->opNum);
  return p - buf;
}

dengyihao's avatar
dengyihao 已提交
241 242 243
int stateKeyToString(void* k, char* buf) {
  SStateKey* key = k;
  int        n = 0;
dengyihao's avatar
dengyihao 已提交
244 245 246
  n += sprintf(buf + n, "[groupId:%" PRId64 ",", key->key.groupId);
  n += sprintf(buf + n, "ts:%" PRIi64 ",", key->key.ts);
  n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum);
dengyihao's avatar
dengyihao 已提交
247 248 249
  return n;
}

dengyihao's avatar
dengyihao 已提交
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
//
// SStateSessionKey
//  |-----------SSessionKey----------|
//  |-----STimeWindow-----|
//  |---skey--|---ekey----|--groupId-|--opNum--|
//  |---int64-|--int64_t--|--uint64--|--int64_t|
// |
//
int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
  SStateSessionKey w1, w2;
  memset(&w1, 0, sizeof(w1));
  memset(&w2, 0, sizeof(w2));

  char* p1 = (char*)aBuf;
  char* p2 = (char*)bBuf;

  p1 = taosDecodeFixedI64(p1, &w1.key.win.skey);
  p2 = taosDecodeFixedI64(p2, &w2.key.win.skey);

  p1 = taosDecodeFixedI64(p1, &w1.key.win.ekey);
  p2 = taosDecodeFixedI64(p2, &w2.key.win.ekey);

  p1 = taosDecodeFixedU64(p1, &w1.key.groupId);
  p2 = taosDecodeFixedU64(p2, &w2.key.groupId);

  p1 = taosDecodeFixedI64(p1, &w1.opNum);
  p2 = taosDecodeFixedI64(p2, &w2.opNum);

  return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
}
int stateSessionKeyEncode(void* ses, char* buf) {
  SStateSessionKey* sess = ses;
  int               len = 0;
  len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey);
  len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey);
  len += taosEncodeFixedU64((void**)&buf, sess->key.groupId);
  len += taosEncodeFixedI64((void**)&buf, sess->opNum);
  return len;
}
int stateSessionKeyDecode(void* ses, char* buf) {
  SStateSessionKey* sess = ses;
  int               len = 0;

  char* p = buf;
  p = taosDecodeFixedI64(p, &sess->key.win.skey);
  p = taosDecodeFixedI64(p, &sess->key.win.ekey);
  p = taosDecodeFixedU64(p, &sess->key.groupId);
  p = taosDecodeFixedI64(p, &sess->opNum);
  return p - buf;
}
dengyihao's avatar
dengyihao 已提交
300 301 302
int stateSessionKeyToString(void* k, char* buf) {
  SStateSessionKey* key = k;
  int               n = 0;
dengyihao's avatar
dengyihao 已提交
303 304 305 306
  n += sprintf(buf + n, "[skey:%" PRIi64 ",", key->key.win.skey);
  n += sprintf(buf + n, "ekey:%" PRIi64 ",", key->key.win.ekey);
  n += sprintf(buf + n, "groupId:%" PRIu64 ",", key->key.groupId);
  n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum);
dengyihao's avatar
dengyihao 已提交
307 308
  return n;
}
dengyihao's avatar
dengyihao 已提交
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347

/**
 *  SWinKey
 *  |------groupId------|-----ts------|
 *  |------uint64-------|----int64----|
 */
int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
  SWinKey w1, w2;
  memset(&w1, 0, sizeof(w1));
  memset(&w2, 0, sizeof(w2));

  char* p1 = (char*)aBuf;
  char* p2 = (char*)bBuf;

  p1 = taosDecodeFixedU64(p1, &w1.groupId);
  p2 = taosDecodeFixedU64(p2, &w2.groupId);

  p1 = taosDecodeFixedI64(p1, &w1.ts);
  p2 = taosDecodeFixedI64(p2, &w2.ts);

  return winKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
}

int winKeyEncode(void* k, char* buf) {
  SWinKey* key = k;
  int      len = 0;
  len += taosEncodeFixedU64((void**)&buf, key->groupId);
  len += taosEncodeFixedI64((void**)&buf, key->ts);
  return len;
}

int winKeyDecode(void* k, char* buf) {
  SWinKey* key = k;
  int      len = 0;
  char*    p = buf;
  p = taosDecodeFixedU64(p, &key->groupId);
  p = taosDecodeFixedI64(p, &key->ts);
  return len;
}
dengyihao's avatar
dengyihao 已提交
348 349 350 351

int winKeyToString(void* k, char* buf) {
  SWinKey* key = k;
  int      n = 0;
dengyihao's avatar
dengyihao 已提交
352 353
  n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->groupId);
  n += sprintf(buf + n, "ts:%" PRIi64 "]", key->ts);
dengyihao's avatar
dengyihao 已提交
354 355
  return n;
}
dengyihao's avatar
dengyihao 已提交
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
/*
 * STupleKey
 * |---groupId---|---ts---|---exprIdx---|
 * |---uint64--|---int64--|---int32-----|
 */
int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
  STupleKey w1, w2;
  memset(&w1, 0, sizeof(w1));
  memset(&w2, 0, sizeof(w2));

  char* p1 = (char*)aBuf;
  char* p2 = (char*)bBuf;

  p1 = taosDecodeFixedU64(p1, &w1.groupId);
  p2 = taosDecodeFixedU64(p2, &w2.groupId);

  p1 = taosDecodeFixedI64(p1, &w1.ts);
  p2 = taosDecodeFixedI64(p2, &w2.ts);

  p1 = taosDecodeFixedI32(p1, &w1.exprIdx);
  p2 = taosDecodeFixedI32(p2, &w2.exprIdx);

  return STupleKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
}

int tupleKeyEncode(void* k, char* buf) {
  STupleKey* key = k;
  int        len = 0;
  len += taosEncodeFixedU64((void**)&buf, key->groupId);
  len += taosEncodeFixedI64((void**)&buf, key->ts);
  len += taosEncodeFixedI32((void**)&buf, key->exprIdx);
  return len;
}
int tupleKeyDecode(void* k, char* buf) {
  STupleKey* key = k;
  int        len = 0;
  char*      p = buf;
  p = taosDecodeFixedU64(p, &key->groupId);
  p = taosDecodeFixedI64(p, &key->ts);
  p = taosDecodeFixedI32(p, &key->exprIdx);
  return len;
}
dengyihao's avatar
dengyihao 已提交
398 399 400
int tupleKeyToString(void* k, char* buf) {
  int        n = 0;
  STupleKey* key = k;
dengyihao's avatar
dengyihao 已提交
401 402 403
  n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->groupId);
  n += sprintf(buf + n, "ts:%" PRIi64 ",", key->ts);
  n += sprintf(buf + n, "exprIdx:%d]", key->exprIdx);
dengyihao's avatar
dengyihao 已提交
404 405
  return n;
}
dengyihao's avatar
dengyihao 已提交
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433

int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
  int64_t w1, w2;
  memset(&w1, 0, sizeof(w1));
  memset(&w2, 0, sizeof(w2));
  char* p1 = (char*)aBuf;
  char* p2 = (char*)bBuf;

  taosDecodeFixedI64(p1, &w1);
  taosDecodeFixedI64(p2, &w2);
  if (w1 == w2) {
    return 0;
  } else {
    return w1 < w2 ? -1 : 1;
  }
}
int parKeyEncode(void* k, char* buf) {
  int64_t* groupid = k;
  int      len = taosEncodeFixedI64((void**)&buf, *groupid);
  return len;
}
int parKeyDecode(void* k, char* buf) {
  char*    p = buf;
  int64_t* groupid = k;

  p = taosDecodeFixedI64(p, groupid);
  return p - buf;
}
dengyihao's avatar
dengyihao 已提交
434 435 436
int parKeyToString(void* k, char* buf) {
  int64_t* key = k;
  int      n = 0;
dengyihao's avatar
dengyihao 已提交
437
  n = sprintf(buf + n, "[groupId:%" PRIi64 "]", *key);
dengyihao's avatar
dengyihao 已提交
438 439
  return n;
}
dengyihao's avatar
dengyihao 已提交
440
int stremaValueEncode(void* k, char* buf) {
dengyihao's avatar
dengyihao 已提交
441 442
  int           len = 0;
  SStreamValue* key = k;
dengyihao's avatar
dengyihao 已提交
443 444 445 446 447 448
  len += taosEncodeFixedI64((void**)&buf, key->unixTimestamp);
  len += taosEncodeFixedI32((void**)&buf, key->len);
  len += taosEncodeBinary((void**)&buf, key->data, key->len);
  return len;
}
int streamValueDecode(void* k, char* buf) {
dengyihao's avatar
dengyihao 已提交
449 450
  SStreamValue* key = k;
  char*         p = buf;
dengyihao's avatar
dengyihao 已提交
451 452 453 454 455 456
  p = taosDecodeFixedI64(p, &key->unixTimestamp);
  p = taosDecodeFixedI32(p, &key->len);
  p = taosDecodeBinary(p, (void**)&key->data, key->len);
  return p - buf;
}
int32_t streamValueToString(void* k, char* buf) {
dengyihao's avatar
dengyihao 已提交
457 458
  SStreamValue* key = k;
  int           n = 0;
dengyihao's avatar
dengyihao 已提交
459 460 461 462 463 464
  n += sprintf(buf + n, "[unixTimestamp:%" PRIi64 ",", key->unixTimestamp);
  n += sprintf(buf + n, "len:%d,", key->len);
  n += sprintf(buf + n, "data:%s]", key->data);
  return n;
}

dengyihao's avatar
dengyihao 已提交
465
/*1: stale, 0: no stale*/
dengyihao's avatar
dengyihao 已提交
466
int32_t streaValueIsStale(void* k, int64_t ts) {
dengyihao's avatar
dengyihao 已提交
467
  SStreamValue* key = k;
dengyihao's avatar
dengyihao 已提交
468 469 470 471 472
  if (key->unixTimestamp < ts) {
    return 1;
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
473

dengyihao's avatar
dengyihao 已提交
474 475 476 477
void destroyFunc(void* arg) {
  (void)arg;
  return;
}
dengyihao's avatar
dengyihao 已提交
478

dengyihao's avatar
dengyihao 已提交
479
typedef struct {
dengyihao's avatar
dengyihao 已提交
480 481 482 483 484 485 486 487 488 489 490
  const char*     key;
  int32_t         len;
  int             idx;
  BackendCmpFunc  cmpFunc;
  EncodeFunc      enFunc;
  DecodeFunc      deFunc;
  ToStringFunc    toStrFunc;
  CompareName     cmpName;
  DestroyFunc     detroyFunc;
  EncodeValueFunc enValueFunc;
  DecodeValueFunc deValueFunc;
dengyihao's avatar
dengyihao 已提交
491

dengyihao's avatar
dengyihao 已提交
492 493
} SCfInit;

dengyihao's avatar
dengyihao 已提交
494
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUBFIX) sprintf(name, "%s_%s", idstr, (SUBFIX));
dengyihao's avatar
dengyihao 已提交
495

dengyihao's avatar
dengyihao 已提交
496 497 498 499 500 501 502 503 504 505 506 507
int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) {
  SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)};

  char*   p = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int32_t) + key.len);
  char*   buf = p;
  int32_t len = 0;
  len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
  len += taosEncodeFixedI32((void**)&buf, key.len);
  len += taosEncodeBinary((void**)&buf, (char*)value, vlen);
  *dest = p;
  return len;
}
dengyihao's avatar
dengyihao 已提交
508 509 510 511
/*
 *  ret >= 0 : found valid value
 *  ret < 0 : error or timeout
 */
dengyihao's avatar
dengyihao 已提交
512 513 514
int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) {
  SStreamValue key = {0};
  char*        p = value;
dengyihao's avatar
dengyihao 已提交
515
  if (streamStateValueIsStale(p)) {
dengyihao's avatar
dengyihao 已提交
516 517 518
    *dest = NULL;
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
519 520 521 522 523 524 525 526 527
  int64_t now = taosGetTimestampMs();
  p = taosDecodeFixedI64(p, &key.unixTimestamp);
  p = taosDecodeFixedI32(p, &key.len);
  if (key.len == 0) {
    key.data = NULL;
  } else {
    p = taosDecodeBinary(p, (void**)&(key.data), key.len);
  }

dengyihao's avatar
dengyihao 已提交
528 529 530 531 532 533 534 535
  if (ttl != NULL) {
    *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now;
  }
  if (dest != NULL) {
    *dest = key.data;
  } else {
    taosMemoryFree(key.data);
  }
dengyihao's avatar
dengyihao 已提交
536 537
  return key.len;
}
dengyihao's avatar
dengyihao 已提交
538
SCfInit ginitDict[] = {
dengyihao's avatar
dengyihao 已提交
539
    {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
dengyihao's avatar
dengyihao 已提交
540 541 542 543 544
     destroyFunc, encodeValueFunc, decodeValueFunc},
    {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc,
     encodeValueFunc, decodeValueFunc},
    {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc,
     encodeValueFunc, decodeValueFunc},
dengyihao's avatar
dengyihao 已提交
545
    {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
dengyihao's avatar
dengyihao 已提交
546 547 548 549 550 551 552
     compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc},
    {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc,
     encodeValueFunc, decodeValueFunc},
    {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc,
     encodeValueFunc, decodeValueFunc},
    {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc,
     encodeValueFunc, decodeValueFunc},
dengyihao's avatar
dengyihao 已提交
553 554
};

dengyihao's avatar
dengyihao 已提交
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582
const char* compareDefaultName(void* arg) {
  (void)arg;
  return ginitDict[0].key;
}
const char* compareStateName(void* arg) {
  (void)arg;
  return ginitDict[1].key;
}
const char* compareWinKeyName(void* arg) {
  (void)arg;
  return ginitDict[2].key;
}
const char* compareSessionKeyName(void* arg) {
  (void)arg;
  return ginitDict[3].key;
}
const char* compareFuncKeyName(void* arg) {
  (void)arg;
  return ginitDict[4].key;
}
const char* compareParKeyName(void* arg) {
  (void)arg;
  return ginitDict[5].key;
}
const char* comparePartagKeyName(void* arg) {
  (void)arg;
  return ginitDict[6].key;
}
dengyihao's avatar
dengyihao 已提交
583

dengyihao's avatar
dengyihao 已提交
584 585 586 587 588 589 590 591
void destroyCompactFilteFactory(void* arg) {
  if (arg == NULL) return;
}
const char* compactFilteFactoryName(void* arg) {
  SCompactFilteFactory* state = arg;
  return "stream_compact_filter";
}

dengyihao's avatar
dengyihao 已提交
592
void          destroyCompactFilte(void* arg) { (void)arg; }
dengyihao's avatar
dengyihao 已提交
593 594
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
                           char** newval, size_t* newvlen, unsigned char* value_changed) {
dengyihao's avatar
dengyihao 已提交
595 596
  // int64_t      unixTime = taosGetTimestampMs();
  if (streamStateValueIsStale((char*)val)) {
dengyihao's avatar
dengyihao 已提交
597 598
    return 1;
  }
dengyihao's avatar
dengyihao 已提交
599 600 601 602 603 604 605
  // SStreamValue value;
  // memset(&value, 0, sizeof(value));
  //  streamValueDecode(&value, (char*)val);
  //  taosMemoryFree(value.data);
  //  if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) {
  //    return 1;
  //  }
dengyihao's avatar
dengyihao 已提交
606 607 608 609 610 611 612 613 614 615
  return 0;
}
const char* compactFilteName(void* arg) { return "stream_filte"; }

rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
  SCompactFilteFactory*       state = arg;
  rocksdb_compactionfilter_t* filter =
      rocksdb_compactionfilter_create(NULL, destroyCompactFilte, compactFilte, compactFilteName);
  return filter;
}
dengyihao's avatar
dengyihao 已提交
616

dengyihao's avatar
dengyihao 已提交
617
int streamStateOpenBackend(void* backend, SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
618
  qInfo("start to open backend, %p, %d-%d", pState, pState->streamId, pState->taskId);
dengyihao's avatar
dengyihao 已提交
619
  SBackendHandle* handle = backend;
dengyihao's avatar
dengyihao 已提交
620

dengyihao's avatar
dengyihao 已提交
621
  sprintf(pState->pTdbState->idstr, "%d-%d", pState->streamId, pState->taskId);
dengyihao's avatar
dengyihao 已提交
622
  char* err = NULL;
dengyihao's avatar
dengyihao 已提交
623
  int   cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
dengyihao's avatar
dengyihao 已提交
624

dengyihao's avatar
dengyihao 已提交
625
  RocksdbCfParam*           param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
dengyihao's avatar
dengyihao 已提交
626 627
  const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
  for (int i = 0; i < cfLen; i++) {
628
    cfOpt[i] = rocksdb_options_create_copy(handle->dbOpt);
dengyihao's avatar
dengyihao 已提交
629
    // refactor later
dengyihao's avatar
dengyihao 已提交
630
    rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
dengyihao's avatar
dengyihao 已提交
631
    rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
dengyihao's avatar
dengyihao 已提交
632

dengyihao's avatar
dengyihao 已提交
633
    rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
dengyihao's avatar
dengyihao 已提交
634
    rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
dengyihao's avatar
dengyihao 已提交
635 636

    rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt);
dengyihao's avatar
dengyihao 已提交
637

dengyihao's avatar
dengyihao 已提交
638
    param[i].tableOpt = tableOpt;
dengyihao's avatar
dengyihao 已提交
639
  };
dengyihao's avatar
dengyihao 已提交
640

dengyihao's avatar
dengyihao 已提交
641
  rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**));
dengyihao's avatar
dengyihao 已提交
642
  for (int i = 0; i < cfLen; i++) {
dengyihao's avatar
dengyihao 已提交
643 644
    SCfInit* cf = &ginitDict[i];

dengyihao's avatar
dengyihao 已提交
645 646 647 648
    rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->detroyFunc, cf->cmpFunc, cf->cmpName);
    rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare);
    pCompare[i] = compare;
  }
dengyihao's avatar
dengyihao 已提交
649
  rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*));
dengyihao's avatar
dengyihao 已提交
650 651
  for (int i = 0; i < cfLen; i++) {
    char buf[64] = {0};
dengyihao's avatar
dengyihao 已提交
652
    GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[i].key);
dengyihao's avatar
dengyihao 已提交
653 654
    cfHandle[i] = rocksdb_create_column_family(handle->db, cfOpt[i], buf, &err);
    if (err != NULL) {
dengyihao's avatar
dengyihao 已提交
655 656 657
      qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
      taosMemoryFreeClear(err);
      // return -1;
dengyihao's avatar
dengyihao 已提交
658 659 660
    }
  }
  pState->pTdbState->rocksdb = handle->db;
dengyihao's avatar
dengyihao 已提交
661
  pState->pTdbState->pHandle = cfHandle;
dengyihao's avatar
dengyihao 已提交
662 663 664
  pState->pTdbState->writeOpts = rocksdb_writeoptions_create();
  pState->pTdbState->readOpts = rocksdb_readoptions_create();
  pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt;
dengyihao's avatar
dengyihao 已提交
665
  pState->pTdbState->dbOpt = handle->dbOpt;
dengyihao's avatar
dengyihao 已提交
666
  pState->pTdbState->param = param;
dengyihao's avatar
dengyihao 已提交
667
  pState->pTdbState->pBackendHandle = handle;
dengyihao's avatar
dengyihao 已提交
668 669 670 671

  SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
  pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare);
  rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
dengyihao's avatar
dengyihao 已提交
672 673
  return 0;
}
674

dengyihao's avatar
dengyihao 已提交
675
void streamStateCloseBackend(SStreamState* pState, bool remove) {
dengyihao's avatar
dengyihao 已提交
676 677
  char* status[] = {"close", "drop"};
  qInfo("start to %s backend, %p, %d-%d", status[remove == false ? 0 : 1], pState, pState->streamId, pState->taskId);
dengyihao's avatar
dengyihao 已提交
678 679 680
  if (pState->pTdbState->rocksdb == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
681

dengyihao's avatar
dengyihao 已提交
682
  int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
dengyihao's avatar
dengyihao 已提交
683

dengyihao's avatar
dengyihao 已提交
684 685 686
  char* err = NULL;
  if (remove) {
    for (int i = 0; i < cfLen; i++) {
dengyihao's avatar
dengyihao 已提交
687
      rocksdb_drop_column_family(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[i], &err);
dengyihao's avatar
dengyihao 已提交
688 689 690 691 692 693 694 695
      if (err != NULL) {
        qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
        taosMemoryFreeClear(err);
      }
    }
  } else {
    rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
    for (int i = 0; i < cfLen; i++) {
dengyihao's avatar
dengyihao 已提交
696
      rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err);
dengyihao's avatar
dengyihao 已提交
697 698 699 700
      if (err != NULL) {
        qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
        taosMemoryFreeClear(err);
      }
dengyihao's avatar
dengyihao 已提交
701
    }
dengyihao's avatar
dengyihao 已提交
702
    rocksdb_flushoptions_destroy(flushOpt);
dengyihao's avatar
dengyihao 已提交
703 704
  }

dengyihao's avatar
dengyihao 已提交
705 706 707
  for (int i = 0; i < cfLen; i++) {
    rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]);
  }
dengyihao's avatar
dengyihao 已提交
708
  taosMemoryFreeClear(pState->pTdbState->pHandle);
dengyihao's avatar
dengyihao 已提交
709 710
  for (int i = 0; i < cfLen; i++) {
    rocksdb_options_destroy(pState->pTdbState->cfOpts[i]);
dengyihao's avatar
dengyihao 已提交
711
    rocksdb_block_based_options_destroy(((RocksdbCfParam*)pState->pTdbState->param)[i].tableOpt);
dengyihao's avatar
dengyihao 已提交
712
  }
dengyihao's avatar
dengyihao 已提交
713

dengyihao's avatar
dengyihao 已提交
714 715 716
  if (remove) {
    streamBackendDelCompare(pState->pTdbState->pBackendHandle, pState->pTdbState->pComparNode);
  }
dengyihao's avatar
dengyihao 已提交
717 718 719 720 721
  rocksdb_writeoptions_destroy(pState->pTdbState->writeOpts);
  pState->pTdbState->writeOpts = NULL;

  rocksdb_readoptions_destroy(pState->pTdbState->readOpts);
  pState->pTdbState->readOpts = NULL;
dengyihao's avatar
dengyihao 已提交
722
  taosMemoryFreeClear(pState->pTdbState->cfOpts);
dengyihao's avatar
dengyihao 已提交
723
  taosMemoryFreeClear(pState->pTdbState->param);
dengyihao's avatar
dengyihao 已提交
724
  pState->pTdbState->rocksdb = NULL;
dengyihao's avatar
dengyihao 已提交
725
}
dengyihao's avatar
dengyihao 已提交
726 727 728 729 730 731 732
void streamStateDestroyCompar(void* arg) {
  SCfComparator* comp = (SCfComparator*)arg;
  for (int i = 0; i < comp->numOfComp; i++) {
    rocksdb_comparator_destroy(comp->comp[i]);
  }
  taosMemoryFree(comp->comp);
}
733

dengyihao's avatar
dengyihao 已提交
734
int streamGetInit(const char* funcName) {
dengyihao's avatar
dengyihao 已提交
735
  size_t len = strlen(funcName);
dengyihao's avatar
dengyihao 已提交
736
  for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
dengyihao's avatar
dengyihao 已提交
737
    if (len == ginitDict[i].len && strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) {
dengyihao's avatar
dengyihao 已提交
738 739 740 741 742
      return i;
    }
  }
  return -1;
}
dengyihao's avatar
dengyihao 已提交
743
bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) {
dengyihao's avatar
dengyihao 已提交
744
  rocksdb_iter_seek(iter, buf, len);
dengyihao's avatar
dengyihao 已提交
745
  if (!rocksdb_iter_valid(iter)) {
dengyihao's avatar
dengyihao 已提交
746 747 748 749
    rocksdb_iter_seek_for_prev(iter, buf, len);
    if (!rocksdb_iter_valid(iter)) {
      return false;
    }
dengyihao's avatar
dengyihao 已提交
750
  }
dengyihao's avatar
dengyihao 已提交
751
  return true;
dengyihao's avatar
dengyihao 已提交
752
}
dengyihao's avatar
dengyihao 已提交
753 754
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot,
                                          rocksdb_readoptions_t** readOpt) {
dengyihao's avatar
dengyihao 已提交
755
  int idx = streamGetInit(cfName);
dengyihao's avatar
dengyihao 已提交
756

dengyihao's avatar
dengyihao 已提交
757
  if (snapshot != NULL) {
dengyihao's avatar
dengyihao 已提交
758
    *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
dengyihao's avatar
dengyihao 已提交
759
  }
dengyihao's avatar
dengyihao 已提交
760 761
  rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create();
  *readOpt = rOpt;
dengyihao's avatar
dengyihao 已提交
762

dengyihao's avatar
dengyihao 已提交
763
  rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
dengyihao's avatar
dengyihao 已提交
764 765 766
  rocksdb_readoptions_set_fill_cache(rOpt, 0);

  return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]);
dengyihao's avatar
dengyihao 已提交
767 768
}

dengyihao's avatar
dengyihao 已提交
769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen)                                     \
  do {                                                                                                   \
    code = 0;                                                                                            \
    char  buf[128] = {0};                                                                                \
    char* err = NULL;                                                                                    \
    int   i = streamGetInit(funcname);                                                                   \
    if (i < 0) {                                                                                         \
      qWarn("streamState failed to get cf name: %s", funcname);                                          \
      code = -1;                                                                                         \
      break;                                                                                             \
    }                                                                                                    \
    char toString[128] = {0};                                                                            \
    if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString);                          \
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                         \
    rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx];              \
    rocksdb_t*                      db = pState->pTdbState->rocksdb;                                     \
    rocksdb_writeoptions_t*         opts = pState->pTdbState->writeOpts;                                 \
    char*                           ttlV = NULL;                                                         \
    int32_t                         ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV);    \
    rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
    if (err != NULL) {                                                                                   \
      taosMemoryFree(err);                                                                               \
      qDebug("streamState str: %s failed to write to %s, err: %s", toString, funcname, err);             \
      code = -1;                                                                                         \
    } else {                                                                                             \
      qDebug("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen);             \
    }                                                                                                    \
    taosMemoryFree(ttlV);                                                                                \
dengyihao's avatar
dengyihao 已提交
797 798
  } while (0);

dengyihao's avatar
dengyihao 已提交
799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen)                                      \
  do {                                                                                                   \
    code = 0;                                                                                            \
    char  buf[128] = {0};                                                                                \
    char* err = NULL;                                                                                    \
    int   i = streamGetInit(funcname);                                                                   \
    if (i < 0) {                                                                                         \
      qWarn("streamState failed to get cf name: %s", funcname);                                          \
      code = -1;                                                                                         \
      break;                                                                                             \
    }                                                                                                    \
    char toString[128] = {0};                                                                            \
    if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString);                          \
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                         \
    rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx];              \
    rocksdb_t*                      db = pState->pTdbState->rocksdb;                                     \
    rocksdb_readoptions_t*          opts = pState->pTdbState->readOpts;                                  \
    size_t                          len = 0;                                                             \
    char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err);          \
    if (val == NULL) {                                                                                   \
      qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname);          \
      if (err != NULL) taosMemoryFree(err);                                                              \
      code = -1;                                                                                         \
    } else {                                                                                             \
      char *  p = NULL, *end = NULL;                                                                     \
      int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p);                                        \
      if (len < 0) {                                                                                     \
        qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \
        code = -1;                                                                                       \
      } else {                                                                                           \
        qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len);          \
      }                                                                                                  \
      if (pVal != NULL) {                                                                                \
        *pVal = p;                                                                                       \
      } else {                                                                                           \
        taosMemoryFree(p);                                                                               \
      }                                                                                                  \
dengyihao's avatar
dengyihao 已提交
836
      taosMemoryFree(val);                                                                               \
dengyihao's avatar
dengyihao 已提交
837 838 839 840 841 842 843 844 845
      if (vLen != NULL) *vLen = len;                                                                     \
    }                                                                                                    \
    if (err != NULL) {                                                                                   \
      taosMemoryFree(err);                                                                               \
      qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err);            \
      code = -1;                                                                                         \
    } else {                                                                                             \
      if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname);             \
    }                                                                                                    \
dengyihao's avatar
dengyihao 已提交
846 847
  } while (0);

dengyihao's avatar
dengyihao 已提交
848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key)                                                             \
  do {                                                                                                              \
    code = 0;                                                                                                       \
    char  buf[128] = {0};                                                                                           \
    char* err = NULL;                                                                                               \
    int   i = streamGetInit(funcname);                                                                              \
    if (i < 0) {                                                                                                    \
      qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname);                        \
      code = -1;                                                                                                    \
      break;                                                                                                        \
    }                                                                                                               \
    char toString[128] = {0};                                                                                       \
    if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString);                                     \
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                                    \
    rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx];                         \
    rocksdb_t*                      db = pState->pTdbState->rocksdb;                                                \
    rocksdb_writeoptions_t*         opts = pState->pTdbState->writeOpts;                                            \
    rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err);                                             \
    if (err != NULL) {                                                                                              \
      qError("streamState str: %s failed to del from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
             err);                                                                                                  \
      taosMemoryFree(err);                                                                                          \
      code = -1;                                                                                                    \
    } else {                                                                                                        \
      qDebug("streamState str: %s succ to del from %s_%s", toString, pState->pTdbState->idstr, funcname);           \
    }                                                                                                               \
dengyihao's avatar
dengyihao 已提交
874 875
  } while (0);

dengyihao's avatar
dengyihao 已提交
876
// state cf
dengyihao's avatar
dengyihao 已提交
877 878 879 880
int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
  int code = 0;

  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
881
  STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, vLen);
dengyihao's avatar
dengyihao 已提交
882 883 884 885 886
  return code;
}
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
  int       code = 0;
  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
887
  STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, pVal, pVLen);
dengyihao's avatar
dengyihao 已提交
888 889 890 891 892
  return code;
}
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
  int       code = 0;
  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
893
  STREAM_STATE_DEL_ROCKSDB(pState, "state", &sKey);
dengyihao's avatar
dengyihao 已提交
894 895
  return code;
}
dengyihao's avatar
dengyihao 已提交
896
int32_t streamStateClear_rocksdb(SStreamState* pState) {
dengyihao's avatar
dengyihao 已提交
897
  qDebug("streamStateClear_rocksdb");
dengyihao's avatar
dengyihao 已提交
898

dengyihao's avatar
dengyihao 已提交
899 900 901 902
  SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number};
  SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number};
  char      sKeyStr[128] = {0};
  char      eKeyStr[128] = {0};
dengyihao's avatar
dengyihao 已提交
903

dengyihao's avatar
dengyihao 已提交
904 905
  int sLen = stateKeyEncode(&sKey, sKeyStr);
  int eLen = stateKeyEncode(&eKey, eKeyStr);
dengyihao's avatar
dengyihao 已提交
906

dengyihao's avatar
dengyihao 已提交
907 908 909 910 911 912
  char toStringStart[128] = {0};
  char toStringEnd[128] = {0};
  if (qDebugFlag & DEBUG_TRACE) {
    stateKeyToString(&sKey, toStringStart);
    stateKeyToString(&eKey, toStringEnd);
  }
dengyihao's avatar
dengyihao 已提交
913

dengyihao's avatar
dengyihao 已提交
914
  char* err = NULL;
dengyihao's avatar
dengyihao 已提交
915
  rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1],
dengyihao's avatar
dengyihao 已提交
916
                          sKeyStr, sLen, eKeyStr, eLen, &err);
917 918
  // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
  // eLen);
dengyihao's avatar
dengyihao 已提交
919
  if (err != NULL) {
dengyihao's avatar
dengyihao 已提交
920 921 922 923
    qWarn(
        "failed to delete range cf(state) err: %s, "
        "start: %s, end:%s",
        err, toStringStart, toStringEnd);
dengyihao's avatar
dengyihao 已提交
924 925 926
    taosMemoryFree(err);
  }

dengyihao's avatar
dengyihao 已提交
927
  return 0;
dengyihao's avatar
dengyihao 已提交
928
}
dengyihao's avatar
dengyihao 已提交
929 930
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
  if (!pCur) {
dengyihao's avatar
dengyihao 已提交
931 932
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
933
  rocksdb_iter_next(pCur->iter);
dengyihao's avatar
dengyihao 已提交
934 935
  return 0;
}
dengyihao's avatar
dengyihao 已提交
936 937 938 939 940 941 942 943
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
  qDebug("streamStateGetFirst_rocksdb");
  SWinKey tmp = {.ts = 0, .groupId = 0};
  streamStatePut_rocksdb(pState, &tmp, NULL, 0);
  SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
  int32_t          code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0);
  streamStateFreeCur(pCur);
  streamStateDel_rocksdb(pState, &tmp);
dengyihao's avatar
dengyihao 已提交
944 945 946
  return code;
}

dengyihao's avatar
dengyihao 已提交
947 948 949 950
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
  qDebug("streamStateGetGroupKVByCur_rocksdb");
  if (!pCur) {
    return -1;
dengyihao's avatar
dengyihao 已提交
951
  }
dengyihao's avatar
dengyihao 已提交
952
  uint64_t groupId = pKey->groupId;
dengyihao's avatar
dengyihao 已提交
953

dengyihao's avatar
dengyihao 已提交
954 955 956 957 958
  int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
  if (code == 0) {
    if (pKey->groupId == groupId) {
      return 0;
    }
dengyihao's avatar
dengyihao 已提交
959
  }
dengyihao's avatar
dengyihao 已提交
960
  return -1;
dengyihao's avatar
dengyihao 已提交
961
}
dengyihao's avatar
dengyihao 已提交
962 963 964 965 966
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
  qDebug("streamStateAddIfNotExist_rocksdb");
  int32_t size = *pVLen;
  if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) {
    return 0;
dengyihao's avatar
dengyihao 已提交
967
  }
dengyihao's avatar
dengyihao 已提交
968 969 970
  *pVal = taosMemoryMalloc(size);
  memset(*pVal, 0, size);
  return 0;
dengyihao's avatar
dengyihao 已提交
971
}
dengyihao's avatar
dengyihao 已提交
972 973 974
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
  qDebug("streamStateCurPrev_rocksdb");
  if (!pCur) return -1;
dengyihao's avatar
dengyihao 已提交
975

dengyihao's avatar
dengyihao 已提交
976 977
  rocksdb_iter_prev(pCur->iter);
  return 0;
dengyihao's avatar
dengyihao 已提交
978
}
dengyihao's avatar
dengyihao 已提交
979 980 981 982 983
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
  qDebug("streamStateGetKVByCur_rocksdb");
  if (!pCur) return -1;
  SStateKey  tkey;
  SStateKey* pKtmp = &tkey;
dengyihao's avatar
dengyihao 已提交
984

dengyihao's avatar
dengyihao 已提交
985
  if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
dengyihao's avatar
dengyihao 已提交
986 987 988 989 990 991 992 993 994 995 996
    size_t tlen;
    char*  keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
    stateKeyDecode((void*)pKtmp, keyStr);
    if (pKtmp->opNum != pCur->number) {
      return -1;
    }
    size_t vlen = 0;
    if (pVal != NULL) *pVal = (char*)rocksdb_iter_value(pCur->iter, &vlen);
    if (pVLen != NULL) *pVLen = vlen;
    *pKey = pKtmp->key;
    return 0;
dengyihao's avatar
dengyihao 已提交
997
  }
dengyihao's avatar
dengyihao 已提交
998
  return -1;
dengyihao's avatar
dengyihao 已提交
999
}
dengyihao's avatar
dengyihao 已提交
1000 1001 1002 1003 1004 1005
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) {
  qDebug("streamStateGetAndCheckCur_rocksdb");
  SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key);
  if (pCur) {
    int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0);
    if (code == 0) return pCur;
dengyihao's avatar
dengyihao 已提交
1006 1007
    streamStateFreeCur(pCur);
  }
dengyihao's avatar
dengyihao 已提交
1008
  return NULL;
dengyihao's avatar
dengyihao 已提交
1009
}
dengyihao's avatar
dengyihao 已提交
1010

dengyihao's avatar
dengyihao 已提交
1011 1012
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
  qDebug("streamStateSeekKeyNext_rocksdb");
dengyihao's avatar
dengyihao 已提交
1013 1014 1015 1016 1017
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
dengyihao's avatar
dengyihao 已提交
1018 1019
  pCur->db = pState->pTdbState->rocksdb;
  pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
dengyihao's avatar
dengyihao 已提交
1020

dengyihao's avatar
dengyihao 已提交
1021 1022 1023
  SStateKey sKey = {.key = *key, .opNum = pState->number};
  char      buf[128] = {0};
  int       len = stateKeyEncode((void*)&sKey, buf);
dengyihao's avatar
dengyihao 已提交
1024
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
dengyihao's avatar
dengyihao 已提交
1025 1026
    streamStateFreeCur(pCur);
    return NULL;
dengyihao's avatar
dengyihao 已提交
1027
  }
dengyihao's avatar
dengyihao 已提交
1028
  // skip ttl expired data
dengyihao's avatar
dengyihao 已提交
1029
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
dengyihao's avatar
dengyihao 已提交
1030
    rocksdb_iter_next(pCur->iter);
dengyihao's avatar
dengyihao 已提交
1031
  }
dengyihao's avatar
dengyihao 已提交
1032

dengyihao's avatar
dengyihao 已提交
1033 1034 1035 1036 1037 1038 1039
  if (rocksdb_iter_valid(pCur->iter)) {
    SStateKey curKey;
    size_t    kLen;
    char*     keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
    stateKeyDecode((void*)&curKey, keyStr);
    if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) {
      return pCur;
dengyihao's avatar
dengyihao 已提交
1040
    }
dengyihao's avatar
dengyihao 已提交
1041 1042
    rocksdb_iter_next(pCur->iter);
    return pCur;
dengyihao's avatar
dengyihao 已提交
1043
  }
dengyihao's avatar
dengyihao 已提交
1044 1045
  streamStateFreeCur(pCur);
  return NULL;
dengyihao's avatar
dengyihao 已提交
1046 1047
}

dengyihao's avatar
dengyihao 已提交
1048 1049
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) {
  qDebug("streamStateGetCur_rocksdb");
dengyihao's avatar
dengyihao 已提交
1050 1051 1052 1053 1054
  int32_t         code = 0;
  const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
  STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0);
  char    buf[128] = {0};
  int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
dengyihao's avatar
dengyihao 已提交
1055

dengyihao's avatar
dengyihao 已提交
1056
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
dengyihao's avatar
dengyihao 已提交
1057 1058 1059
  if (pCur == NULL) return NULL;
  pCur->db = pState->pTdbState->rocksdb;
  pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
dengyihao's avatar
dengyihao 已提交
1060
  rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
dengyihao's avatar
dengyihao 已提交
1061

dengyihao's avatar
dengyihao 已提交
1062
  rocksdb_iter_prev(pCur->iter);
dengyihao's avatar
dengyihao 已提交
1063
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
dengyihao's avatar
dengyihao 已提交
1064 1065
    rocksdb_iter_prev(pCur->iter);
  }
dengyihao's avatar
dengyihao 已提交
1066

dengyihao's avatar
dengyihao 已提交
1067 1068 1069 1070
  if (!rocksdb_iter_valid(pCur->iter)) {
    streamStateFreeCur(pCur);
    pCur = NULL;
  }
dengyihao's avatar
dengyihao 已提交
1071
  STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey);
dengyihao's avatar
dengyihao 已提交
1072
  return pCur;
dengyihao's avatar
dengyihao 已提交
1073 1074
}

dengyihao's avatar
dengyihao 已提交
1075
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
1076
  qDebug("streamStateGetCur_rocksdb");
dengyihao's avatar
dengyihao 已提交
1077
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
dengyihao's avatar
dengyihao 已提交
1078

dengyihao's avatar
dengyihao 已提交
1079
  if (pCur == NULL) return NULL;
dengyihao's avatar
dengyihao 已提交
1080
  pCur->db = pState->pTdbState->rocksdb;
dengyihao's avatar
dengyihao 已提交
1081
  pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
dengyihao's avatar
dengyihao 已提交
1082

dengyihao's avatar
dengyihao 已提交
1083
  SStateKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
1084
  char      buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
1085
  int       len = stateKeyEncode((void*)&sKey, buf);
dengyihao's avatar
dengyihao 已提交
1086

dengyihao's avatar
dengyihao 已提交
1087
  rocksdb_iter_seek(pCur->iter, buf, len);
dengyihao's avatar
dengyihao 已提交
1088

dengyihao's avatar
dengyihao 已提交
1089
  if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
dengyihao's avatar
dengyihao 已提交
1090 1091
    size_t vlen;
    char*  val = (char*)rocksdb_iter_value(pCur->iter, &vlen);
dengyihao's avatar
dengyihao 已提交
1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
    if (!streamStateValueIsStale(val)) {
      SStateKey curKey;
      size_t    kLen = 0;
      char*     keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
      stateKeyDecode((void*)&curKey, keyStr);

      if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) {
        pCur->number = pState->number;
        return pCur;
      }
dengyihao's avatar
dengyihao 已提交
1102
    }
dengyihao's avatar
dengyihao 已提交
1103
  }
dengyihao's avatar
dengyihao 已提交
1104 1105 1106
  streamStateFreeCur(pCur);
  return NULL;
}
dengyihao's avatar
dengyihao 已提交
1107

dengyihao's avatar
dengyihao 已提交
1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121
// func cf
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
  int code = 0;
  STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, vLen);
  return code;
}
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
  int code = 0;
  STREAM_STATE_GET_ROCKSDB(pState, "func", key, pVal, pVLen);
  return 0;
}
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
  int code = 0;
  STREAM_STATE_DEL_ROCKSDB(pState, "func", key);
dengyihao's avatar
dengyihao 已提交
1122 1123
  return 0;
}
dengyihao's avatar
dengyihao 已提交
1124

dengyihao's avatar
dengyihao 已提交
1125
// session cf
dengyihao's avatar
dengyihao 已提交
1126 1127 1128
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
  int              code = 0;
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
1129
  STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen);
dengyihao's avatar
dengyihao 已提交
1130 1131
  if (code == -1) {
  }
dengyihao's avatar
dengyihao 已提交
1132 1133
  return code;
}
dengyihao's avatar
dengyihao 已提交
1134 1135 1136 1137 1138 1139 1140 1141 1142 1143
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
  qDebug("streamStateSessionGet_rocksdb");
  int              code = 0;
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key);
  SSessionKey      resKey = *key;
  void*            tmp = NULL;
  int32_t          vLen = 0;
  code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
  if (code == 0) {
    if (pVLen != NULL) *pVLen = vLen;
dengyihao's avatar
dengyihao 已提交
1144

dengyihao's avatar
dengyihao 已提交
1145 1146 1147 1148 1149 1150
    if (key->win.skey != resKey.win.skey) {
      code = -1;
    } else {
      *key = resKey;
      *pVal = taosMemoryCalloc(1, *pVLen);
      memcpy(*pVal, tmp, *pVLen);
dengyihao's avatar
dengyihao 已提交
1151 1152
    }
  }
dengyihao's avatar
dengyihao 已提交
1153 1154 1155
  streamStateFreeCur(pCur);
  // impl later
  return code;
dengyihao's avatar
dengyihao 已提交
1156
}
dengyihao's avatar
dengyihao 已提交
1157 1158 1159 1160 1161 1162 1163

int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) {
  int              code = 0;
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey);
  return code;
}
dengyihao's avatar
dengyihao 已提交
1164
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
1165
  qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
dengyihao's avatar
dengyihao 已提交
1166 1167 1168 1169 1170
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
  pCur->number = pState->number;
dengyihao's avatar
dengyihao 已提交
1171 1172
  pCur->db = pState->pTdbState->rocksdb;
  pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
dengyihao's avatar
dengyihao 已提交
1173

dengyihao's avatar
dengyihao 已提交
1174
  char             buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
1175 1176
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  int              len = stateSessionKeyEncode(&sKey, buf);
dengyihao's avatar
dengyihao 已提交
1177
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
dengyihao's avatar
dengyihao 已提交
1178
    return NULL;
dengyihao's avatar
dengyihao 已提交
1179
  }
dengyihao's avatar
dengyihao 已提交
1180 1181
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter);

dengyihao's avatar
dengyihao 已提交
1182
  if (!rocksdb_iter_valid(pCur->iter)) {
dengyihao's avatar
dengyihao 已提交
1183 1184
    streamStateFreeCur(pCur);
    return NULL;
dengyihao's avatar
dengyihao 已提交
1185 1186 1187 1188 1189 1190 1191 1192 1193 1194
  }

  int32_t          c = 0;
  size_t           klen;
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
  SStateSessionKey curKey = {0};
  stateSessionKeyDecode(&curKey, (char*)iKey);
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;

  rocksdb_iter_prev(pCur->iter);
dengyihao's avatar
dengyihao 已提交
1195
  if (!rocksdb_iter_valid(pCur->iter)) {
dengyihao's avatar
dengyihao 已提交
1196 1197
    // qWarn("streamState failed to seek key prev
    // %s", toString);
dengyihao's avatar
dengyihao 已提交
1198 1199 1200
    streamStateFreeCur(pCur);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1201 1202
  return pCur;
}
dengyihao's avatar
dengyihao 已提交
1203
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
1204
  qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
dengyihao's avatar
dengyihao 已提交
1205 1206 1207 1208
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1209 1210
  pCur->db = pState->pTdbState->rocksdb;
  pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
dengyihao's avatar
dengyihao 已提交
1211
  pCur->number = pState->number;
dengyihao's avatar
dengyihao 已提交
1212

dengyihao's avatar
dengyihao 已提交
1213
  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
1214

dengyihao's avatar
dengyihao 已提交
1215
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
1216
  int              len = stateSessionKeyEncode(&sKey, buf);
dengyihao's avatar
dengyihao 已提交
1217
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
dengyihao's avatar
dengyihao 已提交
1218 1219
    streamStateFreeCur(pCur);
    return NULL;
dengyihao's avatar
dengyihao 已提交
1220
  }
dengyihao's avatar
dengyihao 已提交
1221
  if (iterValueIsStale(pCur->iter)) {
dengyihao's avatar
dengyihao 已提交
1222 1223 1224
    streamStateFreeCur(pCur);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1225 1226 1227 1228 1229 1230 1231
  size_t           klen;
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
  SStateSessionKey curKey = {0};
  stateSessionKeyDecode(&curKey, (char*)iKey);
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur;

  rocksdb_iter_next(pCur->iter);
dengyihao's avatar
dengyihao 已提交
1232 1233 1234 1235
  if (!rocksdb_iter_valid(pCur->iter)) {
    streamStateFreeCur(pCur);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1236 1237
  return pCur;
}
dengyihao's avatar
dengyihao 已提交
1238

dengyihao's avatar
dengyihao 已提交
1239
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
dengyihao's avatar
dengyihao 已提交
1240
  qDebug("streamStateSessionSeekKeyNext_rocksdb");
dengyihao's avatar
dengyihao 已提交
1241 1242 1243 1244
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1245 1246
  pCur->db = pState->pTdbState->rocksdb;
  pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
dengyihao's avatar
dengyihao 已提交
1247
  pCur->number = pState->number;
dengyihao's avatar
dengyihao 已提交
1248

dengyihao's avatar
dengyihao 已提交
1249
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
dengyihao's avatar
dengyihao 已提交
1250

dengyihao's avatar
dengyihao 已提交
1251
  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
1252
  int  len = stateSessionKeyEncode(&sKey, buf);
dengyihao's avatar
dengyihao 已提交
1253
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
dengyihao's avatar
dengyihao 已提交
1254 1255
    streamStateFreeCur(pCur);
    return NULL;
dengyihao's avatar
dengyihao 已提交
1256
  }
dengyihao's avatar
dengyihao 已提交
1257 1258 1259 1260 1261
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_next(pCur->iter);
  if (!rocksdb_iter_valid(pCur->iter)) {
    streamStateFreeCur(pCur);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1262 1263 1264 1265 1266 1267 1268
  size_t           klen;
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
  SStateSessionKey curKey = {0};
  stateSessionKeyDecode(&curKey, (char*)iKey);
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur;

  rocksdb_iter_next(pCur->iter);
dengyihao's avatar
dengyihao 已提交
1269 1270 1271 1272
  if (!rocksdb_iter_valid(pCur->iter)) {
    streamStateFreeCur(pCur);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1273 1274
  return pCur;
}
dengyihao's avatar
dengyihao 已提交
1275
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
1276
  qDebug("streamStateSessionGetKVByCur_rocksdb");
dengyihao's avatar
dengyihao 已提交
1277 1278 1279
  if (!pCur) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1280
  SStateSessionKey ktmp = {0};
dengyihao's avatar
dengyihao 已提交
1281
  size_t           kLen = 0, vLen = 0;
dengyihao's avatar
dengyihao 已提交
1282

dengyihao's avatar
dengyihao 已提交
1283
  if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
dengyihao's avatar
dengyihao 已提交
1284 1285 1286 1287 1288
    return -1;
  }
  const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
  stateSessionKeyDecode((void*)&ktmp, (char*)curKey);

dengyihao's avatar
dengyihao 已提交
1289
  SStateSessionKey* pKTmp = &ktmp;
dengyihao's avatar
dengyihao 已提交
1290 1291 1292 1293 1294
  const char*       vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
  char*             val = NULL;
  int32_t           len = decodeValueFunc((void*)vval, vLen, NULL, &val);
  if (len < 0) {
    return -1;
dengyihao's avatar
dengyihao 已提交
1295
  }
dengyihao's avatar
dengyihao 已提交
1296 1297
  if (pVal != NULL) *pVal = (char*)val;
  if (pVLen != NULL) *pVLen = len;
dengyihao's avatar
dengyihao 已提交
1298 1299 1300 1301 1302 1303 1304 1305 1306 1307

  if (pKTmp->opNum != pCur->number) {
    return -1;
  }
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
    return -1;
  }
  *pKey = pKTmp->key;
  return 0;
}
dengyihao's avatar
dengyihao 已提交
1308 1309 1310
// fill cf
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
  int code = 0;
dengyihao's avatar
dengyihao 已提交
1311

dengyihao's avatar
dengyihao 已提交
1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325
  STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen);
  return code;
}

int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
  int code = 0;
  STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen);
  return code;
}
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
  int code = 0;
  STREAM_STATE_DEL_ROCKSDB(pState, "fill", key);
  return code;
}
dengyihao's avatar
dengyihao 已提交
1326

dengyihao's avatar
dengyihao 已提交
1327 1328
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
  qDebug("streamStateFillGetCur_rocksdb");
dengyihao's avatar
dengyihao 已提交
1329
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
dengyihao's avatar
dengyihao 已提交
1330 1331 1332

  if (pCur == NULL) return NULL;

dengyihao's avatar
dengyihao 已提交
1333
  pCur->db = pState->pTdbState->rocksdb;
dengyihao's avatar
dengyihao 已提交
1334
  pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
dengyihao's avatar
dengyihao 已提交
1335

dengyihao's avatar
dengyihao 已提交
1336 1337
  char buf[128] = {0};
  int  len = winKeyEncode((void*)key, buf);
dengyihao's avatar
dengyihao 已提交
1338 1339 1340
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
    streamStateFreeCur(pCur);
    return NULL;
dengyihao's avatar
dengyihao 已提交
1341
  }
dengyihao's avatar
dengyihao 已提交
1342 1343 1344
  if (iterValueIsStale(pCur->iter)) {
    streamStateFreeCur(pCur);
    return NULL;
dengyihao's avatar
dengyihao 已提交
1345
  }
dengyihao's avatar
dengyihao 已提交
1346

dengyihao's avatar
dengyihao 已提交
1347
  if (rocksdb_iter_valid(pCur->iter)) {
dengyihao's avatar
dengyihao 已提交
1348 1349 1350 1351 1352
    size_t  kLen;
    SWinKey curKey;
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
    winKeyDecode((void*)&curKey, keyStr);
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) {
dengyihao's avatar
dengyihao 已提交
1353 1354 1355
      return pCur;
    }
  }
dengyihao's avatar
dengyihao 已提交
1356

dengyihao's avatar
dengyihao 已提交
1357 1358 1359
  streamStateFreeCur(pCur);
  return NULL;
}
dengyihao's avatar
dengyihao 已提交
1360 1361 1362 1363 1364 1365
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
  qDebug("streamStateFillGetKVByCur_rocksdb");
  if (!pCur) {
    return -1;
  }
  SWinKey winKey;
dengyihao's avatar
dengyihao 已提交
1366 1367 1368 1369 1370 1371
  if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
    return -1;
  }
  size_t tlen;
  char*  keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
  winKeyDecode(&winKey, keyStr);
dengyihao's avatar
dengyihao 已提交
1372

dengyihao's avatar
dengyihao 已提交
1373 1374 1375 1376 1377
  size_t      vlen = 0;
  const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
  char*       dst = NULL;
  int32_t     len = decodeValueFunc((void*)valStr, vlen, NULL, &dst);
  if (len < 0) {
dengyihao's avatar
dengyihao 已提交
1378 1379
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1380 1381 1382 1383

  if (pVal != NULL) *pVal = (char*)dst;
  if (pVLen != NULL) *pVLen = vlen;

dengyihao's avatar
dengyihao 已提交
1384 1385 1386 1387
  *pKey = winKey;
  return 0;
}

dengyihao's avatar
dengyihao 已提交
1388
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
1389
  qDebug("streamStateFillSeekKeyNext_rocksdb");
dengyihao's avatar
dengyihao 已提交
1390 1391 1392 1393
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (!pCur) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1394

dengyihao's avatar
dengyihao 已提交
1395 1396
  pCur->db = pState->pTdbState->rocksdb;
  pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
dengyihao's avatar
dengyihao 已提交
1397

dengyihao's avatar
dengyihao 已提交
1398
  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
1399
  int  len = winKeyEncode((void*)key, buf);
dengyihao's avatar
dengyihao 已提交
1400 1401 1402
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
    streamStateFreeCur(pCur);
    return NULL;
dengyihao's avatar
dengyihao 已提交
1403
  }
dengyihao's avatar
dengyihao 已提交
1404 1405 1406 1407
  // skip stale data
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
    rocksdb_iter_next(pCur->iter);
  }
dengyihao's avatar
dengyihao 已提交
1408

dengyihao's avatar
dengyihao 已提交
1409
  if (rocksdb_iter_valid(pCur->iter)) {
dengyihao's avatar
dengyihao 已提交
1410 1411 1412 1413 1414 1415 1416 1417
    SWinKey curKey;
    size_t  kLen = 0;
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
    winKeyDecode((void*)&curKey, keyStr);
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) {
      return pCur;
    }
    rocksdb_iter_next(pCur->iter);
dengyihao's avatar
dengyihao 已提交
1418
    return pCur;
dengyihao's avatar
dengyihao 已提交
1419 1420 1421 1422 1423
  }
  streamStateFreeCur(pCur);
  return NULL;
}
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
dengyihao's avatar
dengyihao 已提交
1424
  qDebug("streamStateFillSeekKeyPrev_rocksdb");
dengyihao's avatar
dengyihao 已提交
1425 1426 1427 1428
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1429

dengyihao's avatar
dengyihao 已提交
1430 1431
  pCur->db = pState->pTdbState->rocksdb;
  pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
dengyihao's avatar
dengyihao 已提交
1432

dengyihao's avatar
dengyihao 已提交
1433
  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
1434
  int  len = winKeyEncode((void*)key, buf);
dengyihao's avatar
dengyihao 已提交
1435 1436 1437
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
    streamStateFreeCur(pCur);
    return NULL;
dengyihao's avatar
dengyihao 已提交
1438
  }
dengyihao's avatar
dengyihao 已提交
1439 1440 1441
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
    rocksdb_iter_prev(pCur->iter);
  }
dengyihao's avatar
dengyihao 已提交
1442

dengyihao's avatar
dengyihao 已提交
1443
  if (rocksdb_iter_valid(pCur->iter)) {
dengyihao's avatar
dengyihao 已提交
1444 1445 1446 1447 1448 1449 1450 1451
    SWinKey curKey;
    size_t  kLen = 0;
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
    winKeyDecode((void*)&curKey, keyStr);
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) < 0) {
      return pCur;
    }
    rocksdb_iter_prev(pCur->iter);
dengyihao's avatar
dengyihao 已提交
1452
    return pCur;
dengyihao's avatar
dengyihao 已提交
1453
  }
dengyihao's avatar
dengyihao 已提交
1454

dengyihao's avatar
dengyihao 已提交
1455 1456 1457
  streamStateFreeCur(pCur);
  return NULL;
}
dengyihao's avatar
dengyihao 已提交
1458
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
dengyihao's avatar
dengyihao 已提交
1459
  qDebug("streamStateSessionGetKeyByRange_rocksdb");
dengyihao's avatar
dengyihao 已提交
1460 1461 1462 1463 1464
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
  if (pCur == NULL) {
    return -1;
  }
  pCur->number = pState->number;
dengyihao's avatar
dengyihao 已提交
1465 1466
  pCur->db = pState->pTdbState->rocksdb;
  pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
dengyihao's avatar
dengyihao 已提交
1467 1468 1469

  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
  int32_t          c = 0;
dengyihao's avatar
dengyihao 已提交
1470
  char             buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
1471
  int              len = stateSessionKeyEncode(&sKey, buf);
dengyihao's avatar
dengyihao 已提交
1472 1473 1474
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
    streamStateFreeCur(pCur);
    return -1;
dengyihao's avatar
dengyihao 已提交
1475 1476
  }

dengyihao's avatar
dengyihao 已提交
1477
  size_t           kLen;
dengyihao's avatar
dengyihao 已提交
1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515
  const char*      iKeyStr = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
  SStateSessionKey iKey = {0};
  stateSessionKeyDecode(&iKey, (char*)iKeyStr);

  c = stateSessionKeyCmpr(&sKey, sizeof(sKey), &iKey, sizeof(iKey));

  SSessionKey resKey = *key;
  int32_t     code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0);
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
    *curKey = resKey;
    streamStateFreeCur(pCur);
    return code;
  }

  if (c > 0) {
    streamStateCurNext_rocksdb(pState, pCur);
    code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  } else if (c < 0) {
    streamStateCurPrev(pState, pCur);
    code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0);
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
      *curKey = resKey;
      streamStateFreeCur(pCur);
      return code;
    }
  }

  streamStateFreeCur(pCur);
  return -1;
}

int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                                int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
1516
  qDebug("streamStateSessionAddIfNotExist_rocksdb");
dengyihao's avatar
dengyihao 已提交
1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527
  // todo refactor
  int32_t     res = 0;
  SSessionKey originKey = *key;
  SSessionKey searchKey = *key;
  searchKey.win.skey = key->win.skey - gap;
  searchKey.win.ekey = key->win.ekey + gap;
  int32_t valSize = *pVLen;

  void* tmp = taosMemoryMalloc(valSize);

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
1528 1529 1530 1531
  if (pCur == NULL) {
  }
  int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);

dengyihao's avatar
dengyihao 已提交
1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel_rocksdb(pState, key);
      goto _end;
    }
    streamStateCurNext_rocksdb(pState, pCur);
  } else {
    *key = originKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
  }

  code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel_rocksdb(pState, key);
      goto _end;
    }
  }

  *key = originKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:
dengyihao's avatar
dengyihao 已提交
1559
  taosMemoryFree(*pVal);
dengyihao's avatar
dengyihao 已提交
1560 1561 1562 1563
  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
}
dengyihao's avatar
dengyihao 已提交
1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584
int32_t streamStateSessionClear_rocksdb(SStreamState* pState) {
  qDebug("streamStateSessionClear_rocksdb");
  SSessionKey      key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);

  while (1) {
    SSessionKey delKey = {0};
    void*       buf = NULL;
    int32_t     size = 0;
    int32_t     code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size);
    if (code == 0 && size > 0) {
      memset(buf, 0, size);
      streamStateSessionPut_rocksdb(pState, &delKey, buf, size);
    } else {
      break;
    }
    streamStateCurNext_rocksdb(pState, pCur);
  }
  streamStateFreeCur(pCur);
  return -1;
}
dengyihao's avatar
dengyihao 已提交
1585 1586
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
                                              int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
dengyihao's avatar
dengyihao 已提交
1587
  qDebug("streamStateStateAddIfNotExist_rocksdb");
dengyihao's avatar
dengyihao 已提交
1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602
  // todo refactor
  int32_t     res = 0;
  SSessionKey tmpKey = *key;
  int32_t     valSize = *pVLen;
  void*       tmp = taosMemoryMalloc(valSize);
  // tdbRealloc(NULL, valSize);
  if (!tmp) {
    return -1;
  }

  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
  if (code == 0) {
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
      memcpy(tmp, *pVal, valSize);
dengyihao's avatar
dengyihao 已提交
1603
      streamStateSessionDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
1604 1605 1606 1607 1608 1609
      goto _end;
    }

    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
dengyihao's avatar
dengyihao 已提交
1610
      streamStateSessionDel_rocksdb(pState, key);
dengyihao's avatar
dengyihao 已提交
1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640
      goto _end;
    }

    streamStateCurNext_rocksdb(pState, pCur);
  } else {
    *key = tmpKey;
    streamStateFreeCur(pCur);
    pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
  }

  code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
  if (code == 0) {
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
    if (fn(pKeyData, stateKey) == true) {
      memcpy(tmp, *pVal, valSize);
      streamStateSessionDel_rocksdb(pState, key);
      goto _end;
    }
  }

  *key = tmpKey;
  res = 1;
  memset(tmp, 0, valSize);

_end:

  *pVal = tmp;
  streamStateFreeCur(pCur);
  return res;
}
dengyihao's avatar
dengyihao 已提交
1641

dengyihao's avatar
dengyihao 已提交
1642
//  partag cf
dengyihao's avatar
dengyihao 已提交
1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
  int code = 0;
  STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);
  return code;
}

int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
  int code = 0;
  STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, tagVal, tagLen);
  return code;
}
dengyihao's avatar
dengyihao 已提交
1654
// parname cfg
dengyihao's avatar
dengyihao 已提交
1655 1656
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
  int code = 0;
dengyihao's avatar
dengyihao 已提交
1657
  STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN);
dengyihao's avatar
dengyihao 已提交
1658 1659 1660 1661 1662 1663 1664 1665 1666
  return code;
}
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) {
  int    code = 0;
  size_t tagLen;
  STREAM_STATE_GET_ROCKSDB(pState, "parname", &groupId, pVal, &tagLen);
  return code;
}

dengyihao's avatar
dengyihao 已提交
1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
  int code = 0;
  STREAM_STATE_PUT_ROCKSDB(pState, "default", &key, pVal, pVLen);
  return code;
}
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
  int code = 0;
  STREAM_STATE_GET_ROCKSDB(pState, "default", &key, pVal, pVLen);
  return code;
}
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
  int code = 0;
  STREAM_STATE_DEL_ROCKSDB(pState, "default", &key);
  return code;
}

int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
  int   code = 0;
  char* err = NULL;

  rocksdb_snapshot_t*    snapshot = NULL;
  rocksdb_readoptions_t* readopts = NULL;
  rocksdb_iterator_t*    pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
  if (pIter == NULL) {
    return -1;
  }

  rocksdb_iter_seek(pIter, start, strlen(start));
  while (rocksdb_iter_valid(pIter)) {
    const char* key = rocksdb_iter_key(pIter, NULL);
    int32_t     vlen = 0;
    const char* vval = rocksdb_iter_value(pIter, (size_t*)&vlen);
    char*       val = NULL;
    int32_t     len = decodeValueFunc((void*)vval, vlen, NULL, NULL);
    if (len < 0) {
      rocksdb_iter_next(pIter);
      continue;
    }

    if (end != NULL && strcmp(key, end) > 0) {
      break;
    }
    if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
      int64_t checkPoint = 0;
      if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
        taosArrayPush(result, &checkPoint);
      }
    } else {
      break;
    }
    rocksdb_iter_next(pIter);
  }
  rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot);
  rocksdb_readoptions_destroy(readopts);
  rocksdb_iter_destroy(pIter);
  return code;
}
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));

  pCur->db = pState->pTdbState->rocksdb;
  pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt);
  return pCur;
}
int32_t streamDefaultIterValid_rocksdb(void* iter) {
  SStreamStateCur* pCur = iter;
  bool             val = rocksdb_iter_valid(pCur->iter);

dengyihao's avatar
dengyihao 已提交
1735
  return val ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797
}
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
  SStreamStateCur* pCur = iter;
  rocksdb_iter_seek(pCur->iter, key, strlen(key));
}
void streamDefaultIterNext_rocksdb(void* iter) {
  SStreamStateCur* pCur = iter;
  rocksdb_iter_next(pCur->iter);
}
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) {
  SStreamStateCur* pCur = iter;
  return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len);
}
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
  SStreamStateCur* pCur = iter;
  int32_t          vlen = 0;
  char*            dst = NULL;
  const char*      vval = rocksdb_iter_value(pCur->iter, (size_t*)&vlen);
  if (decodeValueFunc((void*)vval, vlen, NULL, &dst) < 0) {
    return NULL;
  }
  return dst;
}
// batch func
void* streamStateCreateBatch() {
  rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create();
  return pBatch;
}
int32_t streamStateGetBatchSize(void* pBatch) {
  if (pBatch == NULL) return 0;
  return rocksdb_writebatch_count(pBatch);
}

void    streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
void    streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
                            void* val, int32_t vlen) {
  int i = streamGetInit(cfName);

  if (i < 0) {
    qError("streamState failed to put to cf name:%s", cfName);
    return -1;
  }
  char    buf[128] = {0};
  int32_t klen = ginitDict[i].enFunc((void*)key, buf);

  char*                           ttlV = NULL;
  int32_t                         ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV);
  rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
  rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
  taosMemoryFree(ttlV);
  return 0;
}
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
  char* err = NULL;
  rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err);
  if (err != NULL) {
    qError("streamState failed to write batch, err:%s", err);
    taosMemoryFree(err);
    return -1;
  }
  return 0;
L
liuyao 已提交
1798
}