diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 44a97bf05517241cf32d48fcfe940dcbb216859a..12d0de8b6b8a083c548dbf1b6f4454f3162731d2 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -24,25 +24,31 @@ extern "C" { // When you want to use this feature, you should find or add the same function in the following sectio #if !defined(WINDOWS) -#ifndef ALLOW_FORBID_FUNC -#define malloc MALLOC_FUNC_TAOS_FORBID -#define calloc CALLOC_FUNC_TAOS_FORBID -#define realloc REALLOC_FUNC_TAOS_FORBID -#define free FREE_FUNC_TAOS_FORBID +// #ifndef ALLOW_FORBID_FUNC +// #define malloc MALLOC_FUNC_TAOS_FORBID +// #define calloc CALLOC_FUNC_TAOS_FORBID +// #define realloc REALLOC_FUNC_TAOS_FORBID +// #define free FREE_FUNC_TAOS_FORBID #ifdef strdup #undef strdup -#define strdup STRDUP_FUNC_TAOS_FORBID +#define strdup STRDUP_FUNC_TAOS_FORBID #endif -#endif // ifndef ALLOW_FORBID_FUNC -#endif // if !defined(WINDOWS) +// #endif // ifndef ALLOW_FORBID_FUNC +#endif // if !defined(WINDOWS) + +// #define taosMemoryFree malloc +#define taosMemoryMalloc malloc +#define taosMemoryCalloc calloc +#define taosMemoryRealloc realloc +#define taosMemoryFree free int32_t taosMemoryDbgInit(); int32_t taosMemoryDbgInitRestore(); -void *taosMemoryMalloc(int64_t size); -void *taosMemoryCalloc(int64_t num, int64_t size); -void *taosMemoryRealloc(void *ptr, int64_t size); -char *taosStrdup(const char *ptr); -void taosMemoryFree(void *ptr); +// void *taosMemoryMalloc(int64_t size); +// void *taosMemoryCalloc(int64_t num, int64_t size); +// void *taosMemoryRealloc(void *ptr, int64_t size); +char *taosStrdup(const char *ptr); +// void taosMemoryFree(void *ptr); int64_t taosMemorySize(void *ptr); void taosPrintBackTrace(); void taosMemoryTrim(int32_t size); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 3b037e206290ff767a1f879e0b46aadf8b3e4f20..4a055102178ce49534eab6f07439e65991c917a5 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -13,23 +13,25 @@ * along with this program. If not, see . */ +#include "ctgRemote.h" #include "catalogInt.h" #include "query.h" #include "systable.h" #include "tname.h" #include "tref.h" #include "trpc.h" -#include "ctgRemote.h" + +typedef void* (*MallocType)(int64_t); int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) { - int32_t code = 0; - SCatalog* pCtg = pJob->pCtg; - int32_t taskNum = taosArrayGetSize(cbParam->taskId); - SDataBuf taskMsg = *pMsg; - int32_t msgNum = 0; - SBatchRsp batchRsp = {0}; - SBatchRspMsg rsp = {0}; - SBatchRspMsg *pRsp = NULL; + int32_t code = 0; + SCatalog* pCtg = pJob->pCtg; + int32_t taskNum = taosArrayGetSize(cbParam->taskId); + SDataBuf taskMsg = *pMsg; + int32_t msgNum = 0; + SBatchRsp batchRsp = {0}; + SBatchRspMsg rsp = {0}; + SBatchRspMsg* pRsp = NULL; if (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) { if (tDeserializeSBatchRsp(pMsg->pData, pMsg->len, &batchRsp) < 0) { @@ -39,7 +41,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu msgNum = taosArrayGetSize(batchRsp.pRsps); } - + if (ASSERTS(taskNum == msgNum || 0 == msgNum, "taskNum %d mis-match msgNum %d", taskNum, msgNum)) { msgNum = 0; } @@ -582,8 +584,8 @@ _return: return code; } -int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg, int32_t *pSize) { - int32_t num = taosArrayGetSize(pBatch->pMsgs); +int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg, int32_t* pSize) { + int32_t num = taosArrayGetSize(pBatch->pMsgs); if (num >= CTG_MAX_REQ_IN_BATCH) { qError("too many msgs %d in one batch request", num); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); @@ -599,7 +601,7 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg, int32_t qError("tSerializeSBatchReq failed"); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - + *msg = taosMemoryCalloc(1, msgSize); if (NULL == (*msg)) { qError("calloc batchReq msg failed, size:%d", msgSize); @@ -625,7 +627,7 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) { size_t len = 0; int32_t* vgId = taosHashGetKey(p, &len); SCtgBatch* pBatch = (SCtgBatch*)p; - int32_t msgSize = 0; + int32_t msgSize = 0; ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); @@ -654,7 +656,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_QNODE_LIST; - void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse); @@ -708,7 +710,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_DNODE_LIST; - void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse); @@ -759,7 +761,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildU int32_t msgLen = 0; int32_t reqType = TDMT_MND_USE_DB; SCtgTask* pTask = tReq ? tReq->pTask : NULL; - void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db); @@ -811,7 +813,7 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_DB_CFG; - void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName); @@ -866,7 +868,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_INDEX; - void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get index from mnode, indexName:%s", indexName); @@ -921,7 +923,7 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_TABLE_INDEX; - void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(name, tbFName); @@ -978,7 +980,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ch char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_RETRIEVE_FUNC; - void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get udf info from mnode, funcName:%s", funcName); @@ -1033,7 +1035,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_USER_AUTH; - void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get user auth from mnode, user:%s", user); @@ -1093,7 +1095,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* int32_t reqType = TDMT_MND_TABLE_META; char tbFName[TSDB_TABLE_FNAME_LEN]; sprintf(tbFName, "%s.%s", dbFName, tbName); - void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName); @@ -1156,7 +1158,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa int32_t reqType = TDMT_VND_TABLE_META; char tbFName[TSDB_TABLE_FNAME_LEN]; sprintf(tbFName, "%s.%s", dbFName, pTableName->tname); - void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse]; ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId, @@ -1225,7 +1227,7 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S int32_t reqType = TDMT_VND_TABLE_CFG; char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFName); - void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname}; @@ -1290,7 +1292,7 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S int32_t reqType = TDMT_MND_TABLE_CFG; char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFName); - void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname}; @@ -1342,7 +1344,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** ou char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_SERVER_VERSION; - void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; qDebug("try to get svr ver from mnode"); diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt index c63020e64d9c6cbe6fe692ffdae02368d0a5aa39..7ceee9b2f2a32c70892c20b37bb6fda635800284 100644 --- a/source/libs/stream/CMakeLists.txt +++ b/source/libs/stream/CMakeLists.txt @@ -3,7 +3,6 @@ add_library(stream STATIC ${STREAM_SRC}) target_include_directories( stream PUBLIC "${TD_SOURCE_DIR}/include/libs/stream" - PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) @@ -13,6 +12,11 @@ if(${BUILD_WITH_ROCKSDB}) PUBLIC rocksdb tdb PRIVATE os util transport qcom executor ) + target_include_directories( + stream + PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include" + ) + add_definitions(-DUSE_ROCKSDB) endif(${BUILD_WITH_ROCKSDB}) diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 1ae4afe0e033731d26072a02e51a4fe34dc0a30f..94bb681240bc9cc3edc535903ff64875f388d2c1 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -254,63 +254,64 @@ int32_t taosMemoryDbgInitRestore() { #endif } -void *taosMemoryMalloc(int64_t size) { -#ifdef USE_TD_MEMORY - void *tmp = malloc(size + sizeof(TdMemoryInfo)); - if (tmp == NULL) return NULL; - - TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp; - pTdMemoryInfo->memorySize = size; - pTdMemoryInfo->symbol = TD_MEMORY_SYMBOL; - taosBackTrace(pTdMemoryInfo->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH); - - return (char *)tmp + sizeof(TdMemoryInfo); -#else - return malloc(size); -#endif -} - -void *taosMemoryCalloc(int64_t num, int64_t size) { -#ifdef USE_TD_MEMORY - int32_t memorySize = num * size; - char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1); - if (tmp == NULL) return NULL; - - TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp; - pTdMemoryInfo->memorySize = memorySize; - pTdMemoryInfo->symbol = TD_MEMORY_SYMBOL; - taosBackTrace(pTdMemoryInfo->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH); +// void *taosMemoryMalloc(int64_t size) { +// #ifdef USE_TD_MEMORY +// void *tmp = malloc(size + sizeof(TdMemoryInfo)); +// if (tmp == NULL) return NULL; + +// TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp; +// pTdMemoryInfo->memorySize = size; +// pTdMemoryInfo->symbol = TD_MEMORY_SYMBOL; +// taosBackTrace(pTdMemoryInfo->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH); + +// return (char *)tmp + sizeof(TdMemoryInfo); +// #else +// return malloc(size); +// #endif +// } - return (char *)tmp + sizeof(TdMemoryInfo); -#else - return calloc(num, size); -#endif -} +// void *taosMemoryCalloc(int64_t num, int64_t size) { +// #ifdef USE_TD_MEMORY +// int32_t memorySize = num * size; +// char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1); +// if (tmp == NULL) return NULL; + +// TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp; +// pTdMemoryInfo->memorySize = memorySize; +// pTdMemoryInfo->symbol = TD_MEMORY_SYMBOL; +// taosBackTrace(pTdMemoryInfo->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH); + +// return (char *)tmp + sizeof(TdMemoryInfo); +// #else +// return calloc(num, size); +// #endif +// } -void *taosMemoryRealloc(void *ptr, int64_t size) { -#ifdef USE_TD_MEMORY - if (ptr == NULL) return taosMemoryMalloc(size); +// void *taosMemoryRealloc(void *ptr, int64_t size) { +// #ifdef USE_TD_MEMORY +// if (ptr == NULL) return taosMemoryMalloc(size); - TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); - ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); - if (tpTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) { -+ return NULL; -+ } +// TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); +// ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); +// if (tpTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) { +// +return NULL; +// + +// } - TdMemoryInfo tdMemoryInfo; - memcpy(&tdMemoryInfo, pTdMemoryInfo, sizeof(TdMemoryInfo)); +// TdMemoryInfo tdMemoryInfo; +// memcpy(&tdMemoryInfo, pTdMemoryInfo, sizeof(TdMemoryInfo)); - void *tmp = realloc(pTdMemoryInfo, size + sizeof(TdMemoryInfo)); - if (tmp == NULL) return NULL; +// void *tmp = realloc(pTdMemoryInfo, size + sizeof(TdMemoryInfo)); +// if (tmp == NULL) return NULL; - memcpy(tmp, &tdMemoryInfo, sizeof(TdMemoryInfo)); - ((TdMemoryInfoPtr)tmp)->memorySize = size; +// memcpy(tmp, &tdMemoryInfo, sizeof(TdMemoryInfo)); +// ((TdMemoryInfoPtr)tmp)->memorySize = size; - return (char *)tmp + sizeof(TdMemoryInfo); -#else - return realloc(ptr, size); -#endif -} +// return (char *)tmp + sizeof(TdMemoryInfo); +// #else +// return realloc(ptr, size); +// #endif +// } char *taosStrdup(const char *ptr) { #ifdef USE_TD_MEMORY @@ -319,8 +320,8 @@ char *taosStrdup(const char *ptr) { TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); if (pTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) { - return NULL; - } + return NULL; + } void *tmp = tstrdup(pTdMemoryInfo); if (tmp == NULL) return NULL; @@ -333,21 +334,21 @@ char *taosStrdup(const char *ptr) { #endif } -void taosMemoryFree(void *ptr) { - if (NULL == ptr) return; -#ifdef USE_TD_MEMORY - TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); - if (pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) { - pTdMemoryInfo->memorySize = 0; - // memset(pTdMemoryInfo, 0, sizeof(TdMemoryInfo)); - free(pTdMemoryInfo); - } else { - free(ptr); - } -#else - return free(ptr); -#endif -} +// void taosMemoryFree(void *ptr) { +// if (NULL == ptr) return; +// #ifdef USE_TD_MEMORY +// TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); +// if (pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) { +// pTdMemoryInfo->memorySize = 0; +// // memset(pTdMemoryInfo, 0, sizeof(TdMemoryInfo)); +// free(pTdMemoryInfo); +// } else { +// free(ptr); +// } +// #else +// return free(ptr); +// #endif +// } int64_t taosMemorySize(void *ptr) { if (ptr == NULL) return 0; @@ -356,8 +357,9 @@ int64_t taosMemorySize(void *ptr) { TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); if (pTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) { -+ return NULL; -+ } + +return NULL; + + + } return pTdMemoryInfo->memorySize; #else @@ -380,12 +382,12 @@ void taosMemoryTrim(int32_t size) { #endif } -void* taosMemoryMallocAlign(uint32_t alignment, int64_t size) { +void *taosMemoryMallocAlign(uint32_t alignment, int64_t size) { #ifdef USE_TD_MEMORY ASSERT(0); #else #if defined(LINUX) - void* p = memalign(alignment, size); + void *p = memalign(alignment, size); return p; #else return taosMemoryMalloc(size);