提交 981466a0 编写于 作者: dengyihao's avatar dengyihao

failed to read from

上级 40535b5e
......@@ -1038,6 +1038,7 @@ int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVa
}
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
qWarn("try to write to cf parname");
#ifdef USE_ROCKSDB
return streamStatePutParName_rocksdb(pState, groupId, tbname);
#else
......
......@@ -65,9 +65,9 @@ int stateKeyDecode(void* k, char* buf) {
int stateKeyToString(void* k, char* buf) {
SStateKey* key = k;
int n = 0;
n += sprintf(buf + n, "groupId:%" PRId64 " ", key->key.groupId);
n += sprintf(buf + n, "ts:%" PRIi64 " ", key->key.ts);
n += sprintf(buf + n, "opNum:%" PRIi64 " ", key->opNum);
n += sprintf(buf + n, "[groupId:%" PRId64 ",", key->key.groupId);
n += sprintf(buf + n, "ts:%" PRIi64 ",", key->key.ts);
n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum);
return n;
}
......@@ -124,10 +124,10 @@ int stateSessionKeyDecode(void* ses, char* buf) {
int stateSessionKeyToString(void* k, char* buf) {
SStateSessionKey* key = k;
int n = 0;
n += sprintf(buf + n, "skey:%" PRIi64 " ", key->key.win.skey);
n += sprintf(buf + n, "ekey:%" PRIi64 " ", key->key.win.ekey);
n += sprintf(buf + n, "groupId:%" PRIu64 " ", key->key.groupId);
n += sprintf(buf + n, "opNum:%" PRIi64 " ", key->opNum);
n += sprintf(buf + n, "[skey:%" PRIi64 ",", key->key.win.skey);
n += sprintf(buf + n, "ekey:%" PRIi64 ",", key->key.win.ekey);
n += sprintf(buf + n, "groupId:%" PRIu64 ",", key->key.groupId);
n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum);
return n;
}
......@@ -173,8 +173,8 @@ int winKeyDecode(void* k, char* buf) {
int winKeyToString(void* k, char* buf) {
SWinKey* key = k;
int n = 0;
n += sprintf(buf + n, "groupId:%" PRIu64 " ", key->groupId);
n += sprintf(buf + n, "ts:%" PRIi64 " ", key->ts);
n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->groupId);
n += sprintf(buf + n, "ts:%" PRIi64 "]", key->ts);
return n;
}
/*
......@@ -222,9 +222,9 @@ int tupleKeyDecode(void* k, char* buf) {
int tupleKeyToString(void* k, char* buf) {
int n = 0;
STupleKey* key = k;
n += sprintf(buf + n, "groupId:%" PRIu64 " ", key->groupId);
n += sprintf(buf + n, "ts:%" PRIi64 " ", key->ts);
n += sprintf(buf + n, "exprIdx:%d ", key->exprIdx);
n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->groupId);
n += sprintf(buf + n, "ts:%" PRIi64 ",", key->ts);
n += sprintf(buf + n, "exprIdx:%d]", key->exprIdx);
return n;
}
......@@ -258,7 +258,7 @@ int parKeyDecode(void* k, char* buf) {
int parKeyToString(void* k, char* buf) {
int64_t* key = k;
int n = 0;
n = sprintf(buf + n, "groupId:%" PRIi64 " ", *key);
n = sprintf(buf + n, "[groupId:%" PRIi64 "]", *key);
return n;
}
......@@ -360,7 +360,7 @@ int streamGetInit(const char* funcName) {
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("failed to get cf name: %s", funcname); \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
} \
char toString[128] = {0}; \
......@@ -372,10 +372,10 @@ int streamGetInit(const char* funcName) {
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (const char*)value, (size_t)vLen, &err); \
if (err != NULL) { \
taosMemoryFree(err); \
qWarn("str: %s failed to write to %s, err: %s", toString, funcname, err); \
qWarn("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
code = -1; \
} else { \
qWarn("str:%s succ to write to %s", toString, funcname); \
qWarn("streamState str:%s succ to write to %s", toString, funcname); \
} \
} while (0);
......@@ -386,7 +386,7 @@ int streamGetInit(const char* funcName) {
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("failed to get cf name: %s", funcname); \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
} \
char toString[128] = {0}; \
......@@ -397,17 +397,17 @@ int streamGetInit(const char* funcName) {
rocksdb_readoptions_t* opts = pState->pTdbState->ropts; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (size_t*)vLen, &err); \
if (val == NULL) { \
qWarn("str: %s failed to read from %s, err: not exist", toString, funcname); \
qWarn("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \
code = -1; \
} else { \
*pVal = val; \
} \
if (err != NULL) { \
taosMemoryFree(err); \
qWarn("str: %s failed to read from %s, err: %s", toString, funcname, err); \
qWarn("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \
code = -1; \
} else { \
if (code == 0) qWarn("str: %s succ to read from %s", toString, funcname); \
if (code == 0) qWarn("streamState str: %s succ to read from %s", toString, funcname); \
} \
} while (0);
......@@ -418,7 +418,7 @@ int streamGetInit(const char* funcName) {
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("failed to get cf name: %s", funcname); \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
} \
char toString[128] = {0}; \
......@@ -429,11 +429,11 @@ int streamGetInit(const char* funcName) {
rocksdb_writeoptions_t* opts = pState->pTdbState->wopts; \
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), &err); \
if (err != NULL) { \
qWarn("str: %s failed to del from %s, err: %s", toString, funcname, err); \
qWarn("streamState str: %s failed to del from %s, err: %s", toString, funcname, err); \
taosMemoryFree(err); \
code = -1; \
} else { \
qWarn("str: %s succ to del from %s", toString, funcname); \
qWarn("streamState str: %s succ to del from %s", toString, funcname); \
} \
} while (0);
......@@ -485,6 +485,7 @@ int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, con
// todo refactor
int32_t streamStateClear_rocksdb(SStreamState* pState) {
qWarn("streamStateClear_rocksdb");
SWinKey key = {.ts = 0, .groupId = 0};
// batch clear later
streamStatePut_rocksdb(pState, &key, NULL, 0);
......@@ -521,6 +522,7 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k
return code;
}
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
qWarn("streamStateSessionSeekKeyCurrentPrev_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
......@@ -550,6 +552,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
return pCur;
}
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
qWarn("streamStateSessionSeekKeyCurrentNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
......@@ -580,6 +583,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
}
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
qWarn("streamStateSessionSeekKeyNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
......@@ -610,6 +614,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
}
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
qWarn("streamStateAddIfNotExist_rocksdb");
int32_t size = *pVLen;
if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) {
return 0;
......@@ -619,6 +624,7 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke
return 0;
}
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qWarn("streamStateGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
pCur->iter =
......@@ -643,6 +649,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
return NULL;
}
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qWarn("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
......@@ -665,6 +672,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
return NULL;
}
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) {
qWarn("streamStateGetAndCheckCur_rocksdb");
SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key);
if (pCur) {
int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0);
......@@ -674,6 +682,7 @@ SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey
return NULL;
}
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qWarn("streamStateGetKVByCur_rocksdb");
if (!pCur) return -1;
SStateKey tkey;
SStateKey* pKtmp = &tkey;
......@@ -691,6 +700,7 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons
return -1;
}
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qWarn("streamStateFillGetKVByCur_rocksdb");
if (!pCur) {
return -1;
}
......@@ -707,6 +717,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
return 0;
}
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qWarn("streamStateGetGroupKVByCur_rocksdb");
if (!pCur) {
return -1;
}
......@@ -721,6 +732,7 @@ int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
return -1;
}
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
qWarn("streamStateGetFirst_rocksdb");
SWinKey tmp = {.ts = 0, .groupId = 0};
streamStatePut_rocksdb(pState, &tmp, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
......@@ -730,6 +742,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
return code;
}
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
qWarn("streamStateSessionGetKVByCur_rocksdb");
if (!pCur) {
return -1;
}
......@@ -758,6 +771,7 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
}
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qWarn("streamStateSeekKeyNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
......@@ -790,6 +804,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
return NULL;
}
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qWarn("streamStateFillSeekKeyNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (!pCur) {
return NULL;
......@@ -820,6 +835,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
return NULL;
}
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
qWarn("streamStateFillSeekKeyPrev_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
......@@ -851,6 +867,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
return NULL;
}
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
qWarn("streamStateCurPrev_rocksdb");
if (!pCur) return -1;
rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
......@@ -869,6 +886,7 @@ int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur)
return 0;
}
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
qWarn("streamStateSessionGetKeyByRange_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return -1;
......@@ -925,6 +943,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
}
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
qWarn("streamStateSessionGet_rocksdb");
int code = 0;
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key);
SSessionKey resKey = *key;
......@@ -953,6 +972,7 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
}
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen) {
qWarn("streamStateSessionAddIfNotExist_rocksdb");
// todo refactor
int32_t res = 0;
SSessionKey originKey = *key;
......@@ -999,6 +1019,7 @@ _end:
}
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
qWarn("streamStateStateAddIfNotExist_rocksdb");
// todo refactor
int32_t res = 0;
SSessionKey tmpKey = *key;
......@@ -1054,6 +1075,7 @@ _end:
}
int32_t streamStateSessionClear_rocksdb(SStreamState* pState) {
qWarn("streamStateSessionClear_rocksdb");
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册