diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7175f1be7439d78b35b4760da8a81f9d8c65788b..fc99202bce042b6f1100dba7ab19693275307747 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1143,7 +1143,8 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { for (int32_t i = 0; i < vlen; ++i) { SVnodeLoad vload = {0}; - int64_t reserved = 0; + int64_t reserved64 = 0; + int32_t reserved32 = 0; if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1; if (tDecodeI8(&decoder, &vload.syncState) < 0) return -1; if (tDecodeI8(&decoder, &vload.syncRestore) < 0) return -1; @@ -1155,9 +1156,9 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1; if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1; if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1; - if (tDecodeI32(&decoder, (int32_t *)&reserved) < 0) return -1; - if (tDecodeI64(&decoder, &reserved) < 0) return -1; - if (tDecodeI64(&decoder, &reserved) < 0) return -1; + if (tDecodeI32(&decoder, (int32_t *)&reserved32) < 0) return -1; + if (tDecodeI64(&decoder, &reserved64) < 0) return -1; + if (tDecodeI64(&decoder, &reserved64) < 0) return -1; if (taosArrayPush(pReq->pVloads, &vload) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -1545,6 +1546,7 @@ int32_t tSerializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pR } int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRsp) { + char *key = NULL, *value = NULL; pRsp->createdDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); pRsp->readDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); pRsp->writeDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); @@ -1553,40 +1555,40 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs pRsp->useDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (pRsp->createdDbs == NULL || pRsp->readDbs == NULL || pRsp->writeDbs == NULL || pRsp->readTbs == NULL || pRsp->writeTbs == NULL || pRsp->useDbs == NULL) { - return -1; + goto _err; } - if (tDecodeCStrTo(pDecoder, pRsp->user) < 0) return -1; - if (tDecodeI8(pDecoder, &pRsp->superAuth) < 0) return -1; - if (tDecodeI8(pDecoder, &pRsp->sysInfo) < 0) return -1; - if (tDecodeI8(pDecoder, &pRsp->enable) < 0) return -1; - if (tDecodeI8(pDecoder, &pRsp->reserve) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->version) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pRsp->user) < 0) goto _err; + if (tDecodeI8(pDecoder, &pRsp->superAuth) < 0) goto _err; + if (tDecodeI8(pDecoder, &pRsp->sysInfo) < 0) goto _err; + if (tDecodeI8(pDecoder, &pRsp->enable) < 0) goto _err; + if (tDecodeI8(pDecoder, &pRsp->reserve) < 0) goto _err; + if (tDecodeI32(pDecoder, &pRsp->version) < 0) goto _err; int32_t numOfCreatedDbs = 0; int32_t numOfReadDbs = 0; int32_t numOfWriteDbs = 0; - if (tDecodeI32(pDecoder, &numOfCreatedDbs) < 0) return -1; - if (tDecodeI32(pDecoder, &numOfReadDbs) < 0) return -1; - if (tDecodeI32(pDecoder, &numOfWriteDbs) < 0) return -1; + if (tDecodeI32(pDecoder, &numOfCreatedDbs) < 0) goto _err; + if (tDecodeI32(pDecoder, &numOfReadDbs) < 0) goto _err; + if (tDecodeI32(pDecoder, &numOfWriteDbs) < 0) goto _err; for (int32_t i = 0; i < numOfCreatedDbs; ++i) { char db[TSDB_DB_FNAME_LEN] = {0}; - if (tDecodeCStrTo(pDecoder, db) < 0) return -1; + if (tDecodeCStrTo(pDecoder, db) < 0) goto _err; int32_t len = strlen(db); taosHashPut(pRsp->createdDbs, db, len, db, len + 1); } for (int32_t i = 0; i < numOfReadDbs; ++i) { char db[TSDB_DB_FNAME_LEN] = {0}; - if (tDecodeCStrTo(pDecoder, db) < 0) return -1; + if (tDecodeCStrTo(pDecoder, db) < 0) goto _err; int32_t len = strlen(db); taosHashPut(pRsp->readDbs, db, len, db, len + 1); } for (int32_t i = 0; i < numOfWriteDbs; ++i) { char db[TSDB_DB_FNAME_LEN] = {0}; - if (tDecodeCStrTo(pDecoder, db) < 0) return -1; + if (tDecodeCStrTo(pDecoder, db) < 0) goto _err; int32_t len = strlen(db); taosHashPut(pRsp->writeDbs, db, len, db, len + 1); } @@ -1595,67 +1597,80 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs int32_t numOfReadTbs = 0; int32_t numOfWriteTbs = 0; int32_t numOfUseDbs = 0; - if (tDecodeI32(pDecoder, &numOfReadTbs) < 0) return -1; - if (tDecodeI32(pDecoder, &numOfWriteTbs) < 0) return -1; - if (tDecodeI32(pDecoder, &numOfUseDbs) < 0) return -1; + if (tDecodeI32(pDecoder, &numOfReadTbs) < 0) goto _err; + if (tDecodeI32(pDecoder, &numOfWriteTbs) < 0) goto _err; + if (tDecodeI32(pDecoder, &numOfUseDbs) < 0) goto _err; for (int32_t i = 0; i < numOfReadTbs; ++i) { int32_t keyLen = 0; - if (tDecodeI32(pDecoder, &keyLen) < 0) return -1; + if (tDecodeI32(pDecoder, &keyLen) < 0) goto _err; - char *key = taosMemoryCalloc(keyLen + 1, sizeof(char)); - if (tDecodeCStrTo(pDecoder, key) < 0) return -1; + key = taosMemoryCalloc(keyLen + 1, sizeof(char)); + if (tDecodeCStrTo(pDecoder, key) < 0) goto _err; int32_t valuelen = 0; - if (tDecodeI32(pDecoder, &valuelen) < 0) return -1; - char *value = taosMemoryCalloc(valuelen + 1, sizeof(char)); - if (tDecodeCStrTo(pDecoder, value) < 0) return -1; + if (tDecodeI32(pDecoder, &valuelen) < 0) goto _err; + + value = taosMemoryCalloc(valuelen + 1, sizeof(char)); + if (tDecodeCStrTo(pDecoder, value) < 0) goto _err; taosHashPut(pRsp->readTbs, key, strlen(key), value, valuelen + 1); - taosMemoryFree(key); - taosMemoryFree(value); + taosMemoryFreeClear(key); + taosMemoryFreeClear(value); } for (int32_t i = 0; i < numOfWriteTbs; ++i) { int32_t keyLen = 0; - if (tDecodeI32(pDecoder, &keyLen) < 0) return -1; + if (tDecodeI32(pDecoder, &keyLen) < 0) goto _err; - char *key = taosMemoryCalloc(keyLen + 1, sizeof(char)); - if (tDecodeCStrTo(pDecoder, key) < 0) return -1; + key = taosMemoryCalloc(keyLen + 1, sizeof(char)); + if (tDecodeCStrTo(pDecoder, key) < 0) goto _err; int32_t valuelen = 0; - if (tDecodeI32(pDecoder, &valuelen) < 0) return -1; - char *value = taosMemoryCalloc(valuelen + 1, sizeof(char)); - if (tDecodeCStrTo(pDecoder, value) < 0) return -1; + if (tDecodeI32(pDecoder, &valuelen) < 0) goto _err; + + value = taosMemoryCalloc(valuelen + 1, sizeof(char)); + if (tDecodeCStrTo(pDecoder, value) < 0) goto _err; taosHashPut(pRsp->writeTbs, key, strlen(key), value, valuelen + 1); - taosMemoryFree(key); - taosMemoryFree(value); + taosMemoryFreeClear(key); + taosMemoryFreeClear(value); } for (int32_t i = 0; i < numOfUseDbs; ++i) { int32_t keyLen = 0; - if (tDecodeI32(pDecoder, &keyLen) < 0) return -1; + if (tDecodeI32(pDecoder, &keyLen) < 0) goto _err; - char *key = taosMemoryCalloc(keyLen + 1, sizeof(char)); - if (tDecodeCStrTo(pDecoder, key) < 0) return -1; + key = taosMemoryCalloc(keyLen + 1, sizeof(char)); + if (tDecodeCStrTo(pDecoder, key) < 0) goto _err; int32_t ref = 0; - if (tDecodeI32(pDecoder, &ref) < 0) return -1; + if (tDecodeI32(pDecoder, &ref) < 0) goto _err; + taosHashPut(pRsp->useDbs, key, strlen(key), &ref, sizeof(ref)); - taosMemoryFree(key); + taosMemoryFreeClear(key); } // since 3.0.7.0 if (!tDecodeIsEnd(pDecoder)) { - if (tDecodeI32(pDecoder, &pRsp->passVer) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->passVer) < 0) goto _err; } else { pRsp->passVer = 0; } } - return 0; +_err: + taosHashCleanup(pRsp->createdDbs); + taosHashCleanup(pRsp->readDbs); + taosHashCleanup(pRsp->writeDbs); + taosHashCleanup(pRsp->writeTbs); + taosHashCleanup(pRsp->readTbs); + taosHashCleanup(pRsp->useDbs); + + taosMemoryFreeClear(key); + taosMemoryFreeClear(value); + return -1; } int32_t tDeserializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pRsp) { @@ -2844,7 +2859,6 @@ int32_t tSerializeSDbHbRspImp(SEncoder *pEncoder, const SDbHbRsp *pRsp) { return 0; } - int32_t tSerializeSDbHbBatchRsp(void *buf, int32_t bufLen, SDbHbBatchRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -2908,7 +2922,7 @@ int32_t tDeserializeSUseDbRsp(void *buf, int32_t bufLen, SUseDbRsp *pRsp) { return 0; } -int32_t tDeserializeSDbHbRspImp(SDecoder* decoder, SDbHbRsp* pRsp) { +int32_t tDeserializeSDbHbRspImp(SDecoder *decoder, SDbHbRsp *pRsp) { int8_t flag = 0; if (tDecodeI8(decoder, &flag) < 0) return -1; if (flag) { @@ -3196,7 +3210,7 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) { return tlen; } -int32_t tDeserializeSDbCfgRspImpl(SDecoder* decoder, SDbCfgRsp *pRsp) { +int32_t tDeserializeSDbCfgRspImpl(SDecoder *decoder, SDbCfgRsp *pRsp) { if (tDecodeCStrTo(decoder, pRsp->db) < 0) return -1; if (tDecodeI64(decoder, &pRsp->dbId) < 0) return -1; if (tDecodeI32(decoder, &pRsp->cfgVersion) < 0) return -1; @@ -5306,10 +5320,10 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) { return 0; } -int32_t tDeatroySMqHbReq(SMqHbReq* pReq){ - for(int i = 0; i < taosArrayGetSize(pReq->topics); i++){ - TopicOffsetRows* vgs = taosArrayGet(pReq->topics, i); - if(vgs) taosArrayDestroy(vgs->offsetRows); +int32_t tDeatroySMqHbReq(SMqHbReq *pReq) { + for (int i = 0; i < taosArrayGetSize(pReq->topics); i++) { + TopicOffsetRows *vgs = taosArrayGet(pReq->topics, i); + if (vgs) taosArrayDestroy(vgs->offsetRows); } taosArrayDestroy(pReq->topics); return 0; @@ -5326,7 +5340,7 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { int32_t sz = taosArrayGetSize(pReq->topics); if (tEncodeI32(&encoder, sz) < 0) return -1; for (int32_t i = 0; i < sz; ++i) { - TopicOffsetRows* vgs = (TopicOffsetRows*)taosArrayGet(pReq->topics, i); + TopicOffsetRows *vgs = (TopicOffsetRows *)taosArrayGet(pReq->topics, i); if (tEncodeCStr(&encoder, vgs->topicName) < 0) return -1; int32_t szVgs = taosArrayGetSize(vgs->offsetRows); if (tEncodeI32(&encoder, szVgs) < 0) return -1; @@ -5356,19 +5370,19 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1; int32_t sz = 0; if (tDecodeI32(&decoder, &sz) < 0) return -1; - if(sz > 0){ + if (sz > 0) { pReq->topics = taosArrayInit(sz, sizeof(TopicOffsetRows)); if (NULL == pReq->topics) return -1; for (int32_t i = 0; i < sz; ++i) { - TopicOffsetRows* data = taosArrayReserve(pReq->topics, 1); + TopicOffsetRows *data = taosArrayReserve(pReq->topics, 1); tDecodeCStrTo(&decoder, data->topicName); int32_t szVgs = 0; if (tDecodeI32(&decoder, &szVgs) < 0) return -1; - if(szVgs > 0){ + if (szVgs > 0) { data->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); if (NULL == data->offsetRows) return -1; - for (int32_t j= 0; j < szVgs; ++j) { - OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); + for (int32_t j = 0; j < szVgs; ++j) { + OffsetRows *offRows = taosArrayReserve(data->offsetRows, 1); if (tDecodeI32(&decoder, &offRows->vgId) < 0) return -1; if (tDecodeI64(&decoder, &offRows->rows) < 0) return -1; if (tDecodeSTqOffsetVal(&decoder, &offRows->offset) < 0) return -1; @@ -5382,7 +5396,6 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { return 0; } - int32_t tSerializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { @@ -5610,9 +5623,9 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t headLen = sizeof(SMsgHead); -// SMsgHead *pHead = buf; -// pHead->vgId = pReq->head.vgId; -// pHead->contLen = pReq->head.contLen; + // SMsgHead *pHead = buf; + // pHead->vgId = pReq->head.vgId; + // pHead->contLen = pReq->head.contLen; SDecoder decoder = {0}; tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); @@ -6983,7 +6996,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { return 0; } -int32_t tDecodeSVAlterTbReqSetCtime(SDecoder* pDecoder, SVAlterTbReq* pReq, int64_t ctimeMs) { +int32_t tDecodeSVAlterTbReqSetCtime(SDecoder *pDecoder, SVAlterTbReq *pReq, int64_t ctimeMs) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeSVAlterTbReqCommon(pDecoder, pReq) < 0) return -1; @@ -7216,13 +7229,13 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { return 0; } -int32_t tEncodeMqVgOffset(SEncoder* pEncoder, const SMqVgOffset* pOffset) { +int32_t tEncodeMqVgOffset(SEncoder *pEncoder, const SMqVgOffset *pOffset) { if (tEncodeSTqOffset(pEncoder, &pOffset->offset) < 0) return -1; if (tEncodeI64(pEncoder, pOffset->consumerId) < 0) return -1; return 0; } -int32_t tDecodeMqVgOffset(SDecoder* pDecoder, SMqVgOffset* pOffset) { +int32_t tDecodeMqVgOffset(SDecoder *pDecoder, SMqVgOffset *pOffset) { if (tDecodeSTqOffset(pDecoder, &pOffset->offset) < 0) return -1; if (tDecodeI64(pDecoder, &pOffset->consumerId) < 0) return -1; return 0; @@ -7407,7 +7420,7 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { } int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { - if (tDecodeMqDataRsp(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1; + if (tDecodeMqDataRsp(pDecoder, (SMqDataRsp *)pRsp) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1; if (pRsp->createTableNum) { diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 32b63fa9509845619dbd6177e6a36b4282ec23ee..f7f57f2455fb9bdc47a20301795a8fa6ca561fb8 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -455,7 +455,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { } } - if (diffIdx == -1 && diffIdx == 0) { + if (diffIdx == -1 || diffIdx == 0) { goto _err; } @@ -1654,10 +1654,11 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb if (ret < 0) { terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST; return -1; + } else { + uid = *(tb_uid_t *)pVal; + tdbFree(pVal); + pVal = NULL; } - uid = *(tb_uid_t *)pVal; - tdbFree(pVal); - pVal = NULL; if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(tb_uid_t), &pVal, &nVal) == -1) { ret = -1; @@ -1736,12 +1737,16 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb nTagData = tDataTypes[pCol->type].bytes; } if (metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, uid, &pTagIdxKey, &nTagIdxKey) < 0) { + tdbFree(pKey); + tdbFree(pVal); metaDestroyTagIdxKey(pTagIdxKey); + tdbTbcClose(pCtbIdxc); goto _err; } tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); metaDestroyTagIdxKey(pTagIdxKey); } + tdbTbcClose(pCtbIdxc); return 0; _err: diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index cebe4e8204777926bf6aabf3352436595b84011d..242256cd347a35defe294be404c8460df8cb21f1 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -190,8 +190,8 @@ void streamBackendCleanup(void* arg) { taosThreadMutexDestroy(&pHandle->cfMutex); - taosMemoryFree(pHandle); qDebug("destroy stream backend backend:%p", pHandle); + taosMemoryFree(pHandle); return; } SListNode* streamBackendAddCompare(void* backend, void* arg) { @@ -704,8 +704,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t char suffix[64] = {0}; rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); - RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*)); - rocksdb_comparator_t** pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t**)); + RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam)); + rocksdb_comparator_t** pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*)); rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); for (int i = 0; i < nCf; i++) { @@ -861,7 +861,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { param[i].tableOpt = tableOpt; }; - rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**)); + rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); for (int i = 0; i < cfLen; i++) { SCfInit* cf = &ginitDict[i]; @@ -1012,16 +1012,15 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len } rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt) { - int idx = streamStateGetCfIdx(pState, cfName); + int idx = streamStateGetCfIdx(pState, cfName); + rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); + *readOpt = rOpt; if (snapshot != NULL) { *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); + rocksdb_readoptions_set_snapshot(rOpt, *snapshot); + rocksdb_readoptions_set_fill_cache(rOpt, 0); } - rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); - *readOpt = rOpt; - - rocksdb_readoptions_set_snapshot(rOpt, *snapshot); - rocksdb_readoptions_set_fill_cache(rOpt, 0); return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[idx]); @@ -1049,8 +1048,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ if (err != NULL) { \ - taosMemoryFree(err); \ qError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ + taosMemoryFree(err); \ code = -1; \ } else { \ qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \ @@ -1389,8 +1388,6 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k int code = 0; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen); - if (code == -1) { - } return code; } int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { @@ -1408,8 +1405,10 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo code = -1; } else { *key = resKey; - *pVal = taosMemoryCalloc(1, *pVLen); - memcpy(*pVal, tmp, *pVLen); + if (pVal != NULL && pVLen != NULL) { + *pVal = taosMemoryCalloc(1, *pVLen); + memcpy(*pVal, tmp, *pVLen); + } } } taosMemoryFree(tmp); diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 04b546b36a48bd1df071c7b077d82c57c2ddea8a..c483d82027ae971da2644fb5c2ef8d2ee1f94c6c 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -391,7 +391,13 @@ static void httpHandleReq(SHttpMsg* msg) { // set up timeout to avoid stuck; int32_t fd = taosCreateSocketWithTimeout(5); - int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + if (fd < 0) { + tError("http-report failed to open socket, dst:%s:%d", cli->addr, cli->port); + taosReleaseRef(httpRefMgt, httpRef); + destroyHttpClient(cli); + return; + } + int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); if (ret != 0) { tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); taosReleaseRef(httpRefMgt, httpRef);