提交 86fe2169 编写于 作者: dengyihao's avatar dengyihao

add backend

上级 ca5f813c
......@@ -24,25 +24,31 @@ extern "C" {
// When you want to use this feature, you should find or add the same function in the following sectio
#if !defined(WINDOWS)
#ifndef ALLOW_FORBID_FUNC
#define malloc MALLOC_FUNC_TAOS_FORBID
#define calloc CALLOC_FUNC_TAOS_FORBID
#define realloc REALLOC_FUNC_TAOS_FORBID
#define free FREE_FUNC_TAOS_FORBID
// #ifndef ALLOW_FORBID_FUNC
// #define malloc MALLOC_FUNC_TAOS_FORBID
// #define calloc CALLOC_FUNC_TAOS_FORBID
// #define realloc REALLOC_FUNC_TAOS_FORBID
// #define free FREE_FUNC_TAOS_FORBID
#ifdef strdup
#undef strdup
#define strdup STRDUP_FUNC_TAOS_FORBID
#define strdup STRDUP_FUNC_TAOS_FORBID
#endif
#endif // ifndef ALLOW_FORBID_FUNC
#endif // if !defined(WINDOWS)
// #endif // ifndef ALLOW_FORBID_FUNC
#endif // if !defined(WINDOWS)
// #define taosMemoryFree malloc
#define taosMemoryMalloc malloc
#define taosMemoryCalloc calloc
#define taosMemoryRealloc realloc
#define taosMemoryFree free
int32_t taosMemoryDbgInit();
int32_t taosMemoryDbgInitRestore();
void *taosMemoryMalloc(int64_t size);
void *taosMemoryCalloc(int64_t num, int64_t size);
void *taosMemoryRealloc(void *ptr, int64_t size);
char *taosStrdup(const char *ptr);
void taosMemoryFree(void *ptr);
// void *taosMemoryMalloc(int64_t size);
// void *taosMemoryCalloc(int64_t num, int64_t size);
// void *taosMemoryRealloc(void *ptr, int64_t size);
char *taosStrdup(const char *ptr);
// void taosMemoryFree(void *ptr);
int64_t taosMemorySize(void *ptr);
void taosPrintBackTrace();
void taosMemoryTrim(int32_t size);
......
......@@ -13,23 +13,25 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ctgRemote.h"
#include "catalogInt.h"
#include "query.h"
#include "systable.h"
#include "tname.h"
#include "tref.h"
#include "trpc.h"
#include "ctgRemote.h"
typedef void* (*MallocType)(int64_t);
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0;
SCatalog* pCtg = pJob->pCtg;
int32_t taskNum = taosArrayGetSize(cbParam->taskId);
SDataBuf taskMsg = *pMsg;
int32_t msgNum = 0;
SBatchRsp batchRsp = {0};
SBatchRspMsg rsp = {0};
SBatchRspMsg *pRsp = NULL;
int32_t code = 0;
SCatalog* pCtg = pJob->pCtg;
int32_t taskNum = taosArrayGetSize(cbParam->taskId);
SDataBuf taskMsg = *pMsg;
int32_t msgNum = 0;
SBatchRsp batchRsp = {0};
SBatchRspMsg rsp = {0};
SBatchRspMsg* pRsp = NULL;
if (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) {
if (tDeserializeSBatchRsp(pMsg->pData, pMsg->len, &batchRsp) < 0) {
......@@ -39,7 +41,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
msgNum = taosArrayGetSize(batchRsp.pRsps);
}
if (ASSERTS(taskNum == msgNum || 0 == msgNum, "taskNum %d mis-match msgNum %d", taskNum, msgNum)) {
msgNum = 0;
}
......@@ -582,8 +584,8 @@ _return:
return code;
}
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg, int32_t *pSize) {
int32_t num = taosArrayGetSize(pBatch->pMsgs);
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg, int32_t* pSize) {
int32_t num = taosArrayGetSize(pBatch->pMsgs);
if (num >= CTG_MAX_REQ_IN_BATCH) {
qError("too many msgs %d in one batch request", num);
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
......@@ -599,7 +601,7 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg, int32_t
qError("tSerializeSBatchReq failed");
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
*msg = taosMemoryCalloc(1, msgSize);
if (NULL == (*msg)) {
qError("calloc batchReq msg failed, size:%d", msgSize);
......@@ -625,7 +627,7 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
size_t len = 0;
int32_t* vgId = taosHashGetKey(p, &len);
SCtgBatch* pBatch = (SCtgBatch*)p;
int32_t msgSize = 0;
int32_t msgSize = 0;
ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
......@@ -654,7 +656,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_QNODE_LIST;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
......@@ -708,7 +710,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_DNODE_LIST;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
......@@ -759,7 +761,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildU
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_USE_DB;
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
......@@ -811,7 +813,7 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_DB_CFG;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);
......@@ -866,7 +868,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_INDEX;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get index from mnode, indexName:%s", indexName);
......@@ -921,7 +923,7 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(name, tbFName);
......@@ -978,7 +980,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ch
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get udf info from mnode, funcName:%s", funcName);
......@@ -1033,7 +1035,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_USER_AUTH;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get user auth from mnode, user:%s", user);
......@@ -1093,7 +1095,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char*
int32_t reqType = TDMT_MND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, tbName);
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName);
......@@ -1156,7 +1158,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
int32_t reqType = TDMT_VND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, pTableName->tname);
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
......@@ -1225,7 +1227,7 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
int32_t reqType = TDMT_VND_TABLE_CFG;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFName);
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
......@@ -1290,7 +1292,7 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
int32_t reqType = TDMT_MND_TABLE_CFG;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFName);
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
......@@ -1342,7 +1344,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** ou
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_SERVER_VERSION;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
qDebug("try to get svr ver from mnode");
......
......@@ -3,7 +3,6 @@ add_library(stream STATIC ${STREAM_SRC})
target_include_directories(
stream
PUBLIC "${TD_SOURCE_DIR}/include/libs/stream"
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
......@@ -13,6 +12,11 @@ if(${BUILD_WITH_ROCKSDB})
PUBLIC rocksdb tdb
PRIVATE os util transport qcom executor
)
target_include_directories(
stream
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
)
add_definitions(-DUSE_ROCKSDB)
endif(${BUILD_WITH_ROCKSDB})
......
......@@ -254,63 +254,64 @@ int32_t taosMemoryDbgInitRestore() {
#endif
}
void *taosMemoryMalloc(int64_t size) {
#ifdef USE_TD_MEMORY
void *tmp = malloc(size + sizeof(TdMemoryInfo));
if (tmp == NULL) return NULL;
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp;
pTdMemoryInfo->memorySize = size;
pTdMemoryInfo->symbol = TD_MEMORY_SYMBOL;
taosBackTrace(pTdMemoryInfo->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH);
return (char *)tmp + sizeof(TdMemoryInfo);
#else
return malloc(size);
#endif
}
void *taosMemoryCalloc(int64_t num, int64_t size) {
#ifdef USE_TD_MEMORY
int32_t memorySize = num * size;
char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1);
if (tmp == NULL) return NULL;
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp;
pTdMemoryInfo->memorySize = memorySize;
pTdMemoryInfo->symbol = TD_MEMORY_SYMBOL;
taosBackTrace(pTdMemoryInfo->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH);
// void *taosMemoryMalloc(int64_t size) {
// #ifdef USE_TD_MEMORY
// void *tmp = malloc(size + sizeof(TdMemoryInfo));
// if (tmp == NULL) return NULL;
// TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp;
// pTdMemoryInfo->memorySize = size;
// pTdMemoryInfo->symbol = TD_MEMORY_SYMBOL;
// taosBackTrace(pTdMemoryInfo->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH);
// return (char *)tmp + sizeof(TdMemoryInfo);
// #else
// return malloc(size);
// #endif
// }
return (char *)tmp + sizeof(TdMemoryInfo);
#else
return calloc(num, size);
#endif
}
// void *taosMemoryCalloc(int64_t num, int64_t size) {
// #ifdef USE_TD_MEMORY
// int32_t memorySize = num * size;
// char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1);
// if (tmp == NULL) return NULL;
// TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp;
// pTdMemoryInfo->memorySize = memorySize;
// pTdMemoryInfo->symbol = TD_MEMORY_SYMBOL;
// taosBackTrace(pTdMemoryInfo->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH);
// return (char *)tmp + sizeof(TdMemoryInfo);
// #else
// return calloc(num, size);
// #endif
// }
void *taosMemoryRealloc(void *ptr, int64_t size) {
#ifdef USE_TD_MEMORY
if (ptr == NULL) return taosMemoryMalloc(size);
// void *taosMemoryRealloc(void *ptr, int64_t size) {
// #ifdef USE_TD_MEMORY
// if (ptr == NULL) return taosMemoryMalloc(size);
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
if (tpTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) {
+ return NULL;
+ }
// TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
// ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
// if (tpTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) {
// +return NULL;
// +
// }
TdMemoryInfo tdMemoryInfo;
memcpy(&tdMemoryInfo, pTdMemoryInfo, sizeof(TdMemoryInfo));
// TdMemoryInfo tdMemoryInfo;
// memcpy(&tdMemoryInfo, pTdMemoryInfo, sizeof(TdMemoryInfo));
void *tmp = realloc(pTdMemoryInfo, size + sizeof(TdMemoryInfo));
if (tmp == NULL) return NULL;
// void *tmp = realloc(pTdMemoryInfo, size + sizeof(TdMemoryInfo));
// if (tmp == NULL) return NULL;
memcpy(tmp, &tdMemoryInfo, sizeof(TdMemoryInfo));
((TdMemoryInfoPtr)tmp)->memorySize = size;
// memcpy(tmp, &tdMemoryInfo, sizeof(TdMemoryInfo));
// ((TdMemoryInfoPtr)tmp)->memorySize = size;
return (char *)tmp + sizeof(TdMemoryInfo);
#else
return realloc(ptr, size);
#endif
}
// return (char *)tmp + sizeof(TdMemoryInfo);
// #else
// return realloc(ptr, size);
// #endif
// }
char *taosStrdup(const char *ptr) {
#ifdef USE_TD_MEMORY
......@@ -319,8 +320,8 @@ char *taosStrdup(const char *ptr) {
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
if (pTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) {
return NULL;
}
return NULL;
}
void *tmp = tstrdup(pTdMemoryInfo);
if (tmp == NULL) return NULL;
......@@ -333,21 +334,21 @@ char *taosStrdup(const char *ptr) {
#endif
}
void taosMemoryFree(void *ptr) {
if (NULL == ptr) return;
#ifdef USE_TD_MEMORY
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
if (pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) {
pTdMemoryInfo->memorySize = 0;
// memset(pTdMemoryInfo, 0, sizeof(TdMemoryInfo));
free(pTdMemoryInfo);
} else {
free(ptr);
}
#else
return free(ptr);
#endif
}
// void taosMemoryFree(void *ptr) {
// if (NULL == ptr) return;
// #ifdef USE_TD_MEMORY
// TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
// if (pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) {
// pTdMemoryInfo->memorySize = 0;
// // memset(pTdMemoryInfo, 0, sizeof(TdMemoryInfo));
// free(pTdMemoryInfo);
// } else {
// free(ptr);
// }
// #else
// return free(ptr);
// #endif
// }
int64_t taosMemorySize(void *ptr) {
if (ptr == NULL) return 0;
......@@ -356,8 +357,9 @@ int64_t taosMemorySize(void *ptr) {
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
if (pTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) {
+ return NULL;
+ }
+return NULL;
+
}
return pTdMemoryInfo->memorySize;
#else
......@@ -380,12 +382,12 @@ void taosMemoryTrim(int32_t size) {
#endif
}
void* taosMemoryMallocAlign(uint32_t alignment, int64_t size) {
void *taosMemoryMallocAlign(uint32_t alignment, int64_t size) {
#ifdef USE_TD_MEMORY
ASSERT(0);
#else
#if defined(LINUX)
void* p = memalign(alignment, size);
void *p = memalign(alignment, size);
return p;
#else
return taosMemoryMalloc(size);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册