提交 8be1568d 编写于 作者: L lihui

Merge branch 'develop' into feature/lihui

...@@ -125,7 +125,9 @@ IF (NOT DEFINED TD_CLUSTER) ...@@ -125,7 +125,9 @@ IF (NOT DEFINED TD_CLUSTER)
# debug flag # debug flag
# #
# ADD_DEFINITIONS(-D_CHECK_HEADER_FILE_) # ADD_DEFINITIONS(-D_CHECK_HEADER_FILE_)
# ADD_DEFINITIONS(-D_TAOS_MEM_TEST_) IF (${MEM_CHECK} MATCHES "true")
ADD_DEFINITIONS(-DTAOS_MEM_CHECK)
ENDIF ()
IF (TD_CLUSTER) IF (TD_CLUSTER)
ADD_DEFINITIONS(-DCLUSTER) ADD_DEFINITIONS(-DCLUSTER)
......
...@@ -34,8 +34,8 @@ extern "C" { ...@@ -34,8 +34,8 @@ extern "C" {
#include "tglobalcfg.h" #include "tglobalcfg.h"
#include "tlog.h" #include "tlog.h"
#include "tscCache.h" #include "tscCache.h"
#include "tsdb.h"
#include "tscSQLParser.h" #include "tscSQLParser.h"
#include "tsdb.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "tutil.h" #include "tutil.h"
...@@ -247,7 +247,7 @@ typedef struct STableDataBlocks { ...@@ -247,7 +247,7 @@ typedef struct STableDataBlocks {
// for parameter ('?') binding // for parameter ('?') binding
uint32_t numOfAllocedParams; uint32_t numOfAllocedParams;
uint32_t numOfParams; uint32_t numOfParams;
SParamInfo* params; SParamInfo *params;
} STableDataBlocks; } STableDataBlocks;
typedef struct SDataBlockList { typedef struct SDataBlockList {
...@@ -262,18 +262,17 @@ typedef struct SDataBlockList { ...@@ -262,18 +262,17 @@ typedef struct SDataBlockList {
typedef struct { typedef struct {
SOrderVal order; SOrderVal order;
int command; int command;
int count;// TODO refactor
// TODO refactor
int count;
int16_t isInsertFromFile; // load data from file or not
union { union {
bool existsCheck; bool existsCheck; // check if the table exists
int8_t showType; int8_t showType; // show command type
int8_t isInsertFromFile; // load data from file or not
}; };
bool import; // import/insert type
char msgType; char msgType;
uint16_t type; uint16_t type; // query type
char intervalTimeUnit; char intervalTimeUnit;
int64_t etime, stime; int64_t etime, stime;
int64_t nAggTimeInterval; // aggregation time interval int64_t nAggTimeInterval; // aggregation time interval
...@@ -450,12 +449,6 @@ int taos_retrieve(TAOS_RES *res); ...@@ -450,12 +449,6 @@ int taos_retrieve(TAOS_RES *res);
int32_t tscTansformSQLFunctionForMetricQuery(SSqlCmd *pCmd); int32_t tscTansformSQLFunctionForMetricQuery(SSqlCmd *pCmd);
void tscRestoreSQLFunctionForMetricQuery(SSqlCmd *pCmd); void tscRestoreSQLFunctionForMetricQuery(SSqlCmd *pCmd);
/**
* release both metric/meter meta information
* @param pCmd SSqlCmd object that contains the metric/meter meta info
*/
void tscClearSqlMetaInfo(SSqlCmd *pCmd);
void tscClearSqlMetaInfoForce(SSqlCmd *pCmd); void tscClearSqlMetaInfoForce(SSqlCmd *pCmd);
int32_t tscCreateResPointerInfo(SSqlCmd *pCmd, SSqlRes *pRes); int32_t tscCreateResPointerInfo(SSqlCmd *pCmd, SSqlRes *pRes);
...@@ -483,8 +476,8 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql); ...@@ -483,8 +476,8 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql);
void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql); void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql); void tscKillMetricQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
int32_t tscBuildResultsForEmptyRetrieval(SSqlObj *pSql);
bool tscIsUpdateQuery(STscObj *pObj); bool tscIsUpdateQuery(STscObj *pObj);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
// transfer SSqlInfo to SqlCmd struct // transfer SSqlInfo to SqlCmd struct
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
......
...@@ -9,6 +9,22 @@ extern "C" { ...@@ -9,6 +9,22 @@ extern "C" {
#endif #endif
#undef com_taosdata_jdbc_TSDBJNIConnector_INVALID_CONNECTION_POINTER_VALUE #undef com_taosdata_jdbc_TSDBJNIConnector_INVALID_CONNECTION_POINTER_VALUE
#define com_taosdata_jdbc_TSDBJNIConnector_INVALID_CONNECTION_POINTER_VALUE 0LL #define com_taosdata_jdbc_TSDBJNIConnector_INVALID_CONNECTION_POINTER_VALUE 0LL
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method:
* Signature: (Ljava/lang/String;)V
*/
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setAllocModeImp
(JNIEnv *, jclass, jint, jstring, jboolean);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method:
* Signature: ()Ljava/lang/String;
*/
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_dumpMemoryLeakImp
(JNIEnv *, jclass);
/* /*
* Class: com_taosdata_jdbc_TSDBJNIConnector * Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: initImp * Method: initImp
......
...@@ -111,6 +111,20 @@ void jniGetGlobalMethod(JNIEnv *env) { ...@@ -111,6 +111,20 @@ void jniGetGlobalMethod(JNIEnv *env) {
jniTrace("native method register finished"); jniTrace("native method register finished");
} }
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setAllocModeImp(JNIEnv *env, jobject jobj, jint jMode, jstring jPath, jboolean jAutoDump) {
if (jPath != NULL) {
const char *path = (*env)->GetStringUTFChars(env, jPath, NULL);
taosSetAllocMode(jMode, path, !!jAutoDump);
(*env)->ReleaseStringUTFChars(env, jPath, path);
} else {
taosSetAllocMode(jMode, NULL, !!jAutoDump);
}
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_dumpMemoryLeakImp(JNIEnv *env, jobject jobj) {
taosDumpMemoryLeak();
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_initImp(JNIEnv *env, jobject jobj, jstring jconfigDir) { JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_initImp(JNIEnv *env, jobject jobj, jstring jconfigDir) {
if (jconfigDir != NULL) { if (jconfigDir != NULL) {
const char *confDir = (*env)->GetStringUTFChars(env, jconfigDir, NULL); const char *confDir = (*env)->GetStringUTFChars(env, jconfigDir, NULL);
......
...@@ -40,6 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo ...@@ -40,6 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
*/ */
static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
// TODO return the correct error code to client in tscQueueAsyncError
void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) { void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) { if (pObj == NULL || pObj->signature != pObj) {
...@@ -58,14 +59,13 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, ...@@ -58,14 +59,13 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
taosNotePrintTsc(sqlstr); taosNotePrintTsc(sqlstr);
SSqlObj *pSql = (SSqlObj *)malloc(sizeof(SSqlObj)); SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) { if (pSql == NULL) {
tscError("failed to malloc sqlObj"); tscError("failed to malloc sqlObj");
tscQueueAsyncError(fp, param); tscQueueAsyncError(fp, param);
return; return;
} }
memset(pSql, 0, sizeof(SSqlObj));
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
......
此差异已折叠。
此差异已折叠。
...@@ -746,7 +746,7 @@ void setCreateAcctSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pName, SSQLToken ...@@ -746,7 +746,7 @@ void setCreateAcctSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pName, SSQLToken
} }
void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) { void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) {
pDBInfo->numOfBlocksPerTable = -1; pDBInfo->numOfBlocksPerTable = 50;
pDBInfo->compressionLevel = -1; pDBInfo->compressionLevel = -1;
pDBInfo->commitLog = -1; pDBInfo->commitLog = -1;
......
...@@ -123,6 +123,7 @@ bool tsMeterMetaIdentical(SMeterMeta* p1, SMeterMeta* p2) { ...@@ -123,6 +123,7 @@ bool tsMeterMetaIdentical(SMeterMeta* p1, SMeterMeta* p2) {
return memcmp(p1, p2, size) == 0; return memcmp(p1, p2, size) == 0;
} }
//todo refactor
static FORCE_INLINE char* skipSegments(char* input, char delimiter, int32_t num) { static FORCE_INLINE char* skipSegments(char* input, char delimiter, int32_t num) {
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
while (*input != 0 && *input++ != delimiter) { while (*input != 0 && *input++ != delimiter) {
......
...@@ -1415,7 +1415,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql) { ...@@ -1415,7 +1415,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql) {
pMsg = pStart; pMsg = pStart;
pShellMsg = (SShellSubmitMsg *)pMsg; pShellMsg = (SShellSubmitMsg *)pMsg;
pShellMsg->import = pSql->cmd.order.order; pShellMsg->import = pSql->cmd.import;
pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode); pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
pShellMsg->numOfSid = htonl(pSql->cmd.count); // number of meters to be inserted pShellMsg->numOfSid = htonl(pSql->cmd.count); // number of meters to be inserted
...@@ -3453,31 +3453,6 @@ int tscProcessQueryRsp(SSqlObj *pSql) { ...@@ -3453,31 +3453,6 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
return 0; return 0;
} }
static void doDecompressPayload(SSqlCmd *pCmd, SSqlRes *pRes, int16_t compressed) {
if (compressed && pRes->numOfRows > 0) {
SRetrieveMeterRsp *pRetrieve = (SRetrieveMeterRsp *)pRes->pRsp;
int32_t numOfTotalCols = pCmd->fieldsInfo.numOfOutputCols + pCmd->fieldsInfo.numOfHiddenCols;
int32_t rowSize = pCmd->fieldsInfo.pOffset[numOfTotalCols - 1] + pCmd->fieldsInfo.pFields[numOfTotalCols - 1].bytes;
// TODO handle the OOM problem
char * buf = malloc(rowSize * pRes->numOfRows);
int32_t payloadSize = pRes->rspLen - 1 - sizeof(SRetrieveMeterRsp);
assert(payloadSize > 0);
int32_t decompressedSize = tsDecompressString(pRetrieve->data, payloadSize, 1, buf, rowSize * pRes->numOfRows, 0, 0, 0);
assert(decompressedSize == rowSize * pRes->numOfRows);
pRes->pRsp = realloc(pRes->pRsp, pRes->rspLen - payloadSize + decompressedSize);
memcpy(pRes->pRsp + sizeof(SRetrieveMeterRsp), buf, decompressedSize);
free(buf);
}
pRes->data = ((SRetrieveMeterRsp *)pRes->pRsp)->data;
}
int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
...@@ -3490,9 +3465,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { ...@@ -3490,9 +3465,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
pRes->offset = htobe64(pRetrieve->offset); pRes->offset = htobe64(pRetrieve->offset);
pRes->useconds = htobe64(pRetrieve->useconds); pRes->useconds = htobe64(pRetrieve->useconds);
pRetrieve->compress = htons(pRetrieve->compress); pRes->data = pRetrieve->data;
doDecompressPayload(pCmd, pRes, pRetrieve->compress);
tscSetResultPointer(pCmd, pRes); tscSetResultPointer(pCmd, pRes);
pRes->row = 0; pRes->row = 0;
......
...@@ -246,7 +246,12 @@ int taos_query_imp(STscObj* pObj, SSqlObj* pSql) { ...@@ -246,7 +246,12 @@ int taos_query_imp(STscObj* pObj, SSqlObj* pSql) {
tscDoQuery(pSql); tscDoQuery(pSql);
} }
if (pRes->code == TSDB_CODE_SUCCESS) {
tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj); tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj);
} else {
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj);
}
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
tscFreeSqlObjPartial(pSql); tscFreeSqlObjPartial(pSql);
} }
...@@ -266,8 +271,9 @@ int taos_query(TAOS *taos, const char *sqlstr) { ...@@ -266,8 +271,9 @@ int taos_query(TAOS *taos, const char *sqlstr) {
size_t sqlLen = strlen(sqlstr); size_t sqlLen = strlen(sqlstr);
if (sqlLen > TSDB_MAX_SQL_LEN) { if (sqlLen > TSDB_MAX_SQL_LEN) {
tscError("%p sql too long", pSql); pRes->code = tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql
pRes->code = TSDB_CODE_INVALID_SQL; tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
return pRes->code; return pRes->code;
} }
...@@ -276,8 +282,9 @@ int taos_query(TAOS *taos, const char *sqlstr) { ...@@ -276,8 +282,9 @@ int taos_query(TAOS *taos, const char *sqlstr) {
void *sql = realloc(pSql->sqlstr, sqlLen + 1); void *sql = realloc(pSql->sqlstr, sqlLen + 1);
if (sql == NULL) { if (sql == NULL) {
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer", pSql); tscError("%p failed to malloc sql string buffer, reason:%s", pSql, strerror(errno));
tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
return pRes->code; return pRes->code;
} }
...@@ -777,9 +784,9 @@ int taos_errno(TAOS *taos) { ...@@ -777,9 +784,9 @@ int taos_errno(TAOS *taos) {
} }
char *taos_errstr(TAOS *taos) { char *taos_errstr(TAOS *taos) {
STscObj * pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
unsigned char code; uint8_t code;
char temp[256] = {0}; // char temp[256] = {0};
if (pObj == NULL || pObj->signature != pObj) return tsError[globalCode]; if (pObj == NULL || pObj->signature != pObj) return tsError[globalCode];
...@@ -788,9 +795,10 @@ char *taos_errstr(TAOS *taos) { ...@@ -788,9 +795,10 @@ char *taos_errstr(TAOS *taos) {
else else
code = pObj->pSql->res.code; code = pObj->pSql->res.code;
// for invalid sql, additional information is attached to explain why the sql is invalid
if (code == TSDB_CODE_INVALID_SQL) { if (code == TSDB_CODE_INVALID_SQL) {
snprintf(temp, tListLen(temp), "invalid SQL: %s", pObj->pSql->cmd.payload); // snprintf(temp, tListLen(temp), "invalid SQL: %s", pObj->pSql->cmd.payload);
strcpy(pObj->pSql->cmd.payload, temp); // strcpy(pObj->pSql->cmd.payload, temp);
return pObj->pSql->cmd.payload; return pObj->pSql->cmd.payload;
} else { } else {
return tsError[code]; return tsError[code];
......
...@@ -1294,8 +1294,7 @@ int32_t tscValidateName(SSQLToken* pToken) { ...@@ -1294,8 +1294,7 @@ int32_t tscValidateName(SSQLToken* pToken) {
// re-build the whole name string // re-build the whole name string
if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) { if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) {
// first part do not have quote // first part do not have quote do nothing
// do nothing
} else { } else {
pStr[firstPartLen] = TS_PATH_DELIMITER[0]; pStr[firstPartLen] = TS_PATH_DELIMITER[0];
memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n); memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
...@@ -1842,5 +1841,30 @@ bool tscIsUpdateQuery(STscObj* pObj) { ...@@ -1842,5 +1841,30 @@ bool tscIsUpdateQuery(STscObj* pObj) {
SSqlCmd* pCmd = &pObj->pSql->cmd; SSqlCmd* pCmd = &pObj->pSql->cmd;
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) ||
TSDB_SQL_USE_DB == pCmd->command) ? 1 : 0; TSDB_SQL_USE_DB == pCmd->command) ? 1 : 0;
}
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql) {
const char *msgFormat1 = "invalid SQL: %s";
const char *msgFormat2 = "invalid SQL: syntax error near \"%s\" (%s)";
const char *msgFormat3 = "invalid SQL: syntax error near \"%s\"";
const int32_t BACKWARD_CHAR_STEP = 0;
if (sql == NULL) {
assert(additionalInfo != NULL);
sprintf(msg, msgFormat1, additionalInfo);
return TSDB_CODE_INVALID_SQL;
}
char buf[64] = {0}; // only extract part of sql string
strncpy(buf, (sql - BACKWARD_CHAR_STEP), tListLen(buf) - 1);
if (additionalInfo != NULL) {
sprintf(msg, msgFormat2, buf, additionalInfo);
} else {
sprintf(msg, msgFormat3, buf); // no additional information for invalid sql error
}
return TSDB_CODE_INVALID_SQL;
} }
...@@ -12,15 +12,25 @@ ...@@ -12,15 +12,25 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
package taosSql package taosSql
/*
#cgo CFLAGS : -I/usr/include
#include <stdlib.h>
#cgo LDFLAGS: -L/usr/lib -ltaos
void taosSetAllocMode(int mode, const char* path, _Bool autoDump);
void taosDumpMemoryLeak();
*/
import "C"
import ( import (
"database/sql/driver" "database/sql/driver"
"errors" "errors"
"fmt" "fmt"
"sync/atomic" "sync/atomic"
"time" "time"
"unsafe"
) )
// Returns the bool value of the input. // Returns the bool value of the input.
...@@ -398,3 +408,15 @@ func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) { ...@@ -398,3 +408,15 @@ func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) {
} }
/******************************************************************************
* Utils for C memory issues debugging *
******************************************************************************/
func SetAllocMode(mode int32, path string) {
cpath := C.CString(path)
defer C.free(unsafe.Pointer(cpath))
C.taosSetAllocMode(C.int(mode), cpath, false)
}
func DumpMemoryLeak() {
C.taosDumpMemoryLeak()
}
...@@ -568,7 +568,6 @@ typedef struct { ...@@ -568,7 +568,6 @@ typedef struct {
typedef struct { typedef struct {
int32_t numOfRows; int32_t numOfRows;
int16_t precision; int16_t precision;
int16_t compress;
int64_t offset; // updated offset value for multi-vnode projection query int64_t offset; // updated offset value for multi-vnode projection query
int64_t useconds; int64_t useconds;
char data[]; char data[];
......
...@@ -187,18 +187,35 @@ static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, unsigned int inLen, cha ...@@ -187,18 +187,35 @@ static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, unsigned int inLen, cha
char *taosIpStr(uint32_t ipInt); char *taosIpStr(uint32_t ipInt);
#ifdef _TAOS_MEM_TEST_ #define TAOS_ALLOC_MODE_DEFAULT 0
// Use during test to simulate the success and failure scenarios of memory allocation #define TAOS_ALLOC_MODE_RANDOM_FAIL 1
extern void* taos_malloc(unsigned int size, char* _func); #define TAOS_ALLOC_MODE_DETECT_LEAK 2
extern void* taos_calloc(unsigned int num, unsigned int size, char* _func); void taosSetAllocMode(int mode, const char* path, bool autoDump);
extern void* taos_realloc(void* ptr, unsigned int size, char* _func); void taosDumpMemoryLeak();
extern void taos_free(void* ptr);
#define malloc(size) taos_malloc(size, __FUNCTION__) #ifdef TAOS_MEM_CHECK
#define calloc(num, size) taos_calloc(num, size, __FUNCTION__)
#define realloc(ptr, size) taos_realloc(ptr, size, __FUNCTION__) void * taos_malloc(size_t size, const char *file, uint32_t line);
#define free(ptr) taos_free(ptr) void * taos_calloc(size_t num, size_t size, const char *file, uint32_t line);
#endif void * taos_realloc(void *ptr, size_t size, const char *file, uint32_t line);
void taos_free(void *ptr, const char *file, uint32_t line);
char * taos_strdup(const char *str, const char *file, uint32_t line);
char * taos_strndup(const char *str, size_t size, const char *file, uint32_t line);
ssize_t taos_getline(char **lineptr, size_t *n, FILE *stream, const char *file, uint32_t line);
#ifndef TAOS_MEM_CHECK_IMPL
#define malloc(size) taos_malloc(size, __FILE__, __LINE__)
#define calloc(num, size) taos_calloc(num, size, __FILE__, __LINE__)
#define realloc(ptr, size) taos_realloc(ptr, size, __FILE__, __LINE__)
#define free(ptr) taos_free(ptr, __FILE__, __LINE__)
#define strdup(str) taos_strdup(str, __FILE__, __LINE__)
#define strndup(str, size) taos_strndup(str, size, __FILE__, __LINE__)
#define getline(lineptr, n, stream) taos_getline(lineptr, n, stream, __FILE__, __LINE__)
#endif // TAOS_MEM_CHECK_IMPL
#endif // TAOS_MEM_CHECK
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -81,6 +81,7 @@ bool taosGetProcMemory(float *memoryUsedMB) { ...@@ -81,6 +81,7 @@ bool taosGetProcMemory(float *memoryUsedMB) {
char * line = NULL; char * line = NULL;
while (!feof(fp)) { while (!feof(fp)) {
tfree(line); tfree(line);
len = 0;
getline(&line, &len, fp); getline(&line, &len, fp);
if (line == NULL) { if (line == NULL) {
break; break;
...@@ -137,7 +138,7 @@ bool taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) { ...@@ -137,7 +138,7 @@ bool taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) {
return false; return false;
} }
size_t len; size_t len = 0;
char * line = NULL; char * line = NULL;
getline(&line, &len, fp); getline(&line, &len, fp);
if (line == NULL) { if (line == NULL) {
...@@ -409,6 +410,7 @@ bool taosGetCardInfo(int64_t *bytes) { ...@@ -409,6 +410,7 @@ bool taosGetCardInfo(int64_t *bytes) {
while (!feof(fp)) { while (!feof(fp)) {
tfree(line); tfree(line);
len = 0;
getline(&line, &len, fp); getline(&line, &len, fp);
if (line == NULL) { if (line == NULL) {
break; break;
...@@ -480,6 +482,7 @@ bool taosReadProcIO(int64_t *readbyte, int64_t *writebyte) { ...@@ -480,6 +482,7 @@ bool taosReadProcIO(int64_t *readbyte, int64_t *writebyte) {
while (!feof(fp)) { while (!feof(fp)) {
tfree(line); tfree(line);
len = 0;
getline(&line, &len, fp); getline(&line, &len, fp);
if (line == NULL) { if (line == NULL) {
break; break;
......
...@@ -353,9 +353,34 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) { ...@@ -353,9 +353,34 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
if ((pTable->keyType != SDB_KEYTYPE_AUTO) || *((int64_t *)row)) if ((pTable->keyType != SDB_KEYTYPE_AUTO) || *((int64_t *)row))
if (sdbGetRow(handle, row)) { if (sdbGetRow(handle, row)) {
sdbError("table:%s, failed to insert record, sdbVersion:%d", pTable->name, sdbVersion); if (strcmp(pTable->name, "mnode") == 0) {
/*
* The first mnode created when the system just start, so the insert action may failed
* see sdbPeer.c : sdbInitPeers
*/
pTable->id++;
sdbVersion++;
sdbPrint("table:%s, record:%s already exist, think it successed, sdbVersion:%ld id:%d",
pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, pTable->id);
return 0;
} else {
switch (pTable->keyType) {
case SDB_KEYTYPE_STRING:
sdbError("table:%s, failed to insert record:%s sdbVersion:%ld id:%d", pTable->name, (char *)row, sdbVersion, pTable->id);
break;
case SDB_KEYTYPE_UINT32: //dnodes or mnodes
sdbError("table:%s, failed to insert record:%s sdbVersion:%ld id:%d", pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, pTable->id);
break;
case SDB_KEYTYPE_AUTO:
sdbError("table:%s, failed to insert record:%s sdbVersion:%ld id:%d", pTable->name, *(int32_t *)row, sdbVersion, pTable->id);
break;
default:
sdbError("table:%s, failed to insert record:%s sdbVersion:%ld id:%d", pTable->name, sdbVersion, pTable->id);
break;
}
return -1; return -1;
} }
}
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)malloc(total_size); SRowHead *rowHead = (SRowHead *)malloc(total_size);
...@@ -562,7 +587,24 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) { ...@@ -562,7 +587,24 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
if (pTable == NULL || row == NULL) return -1; if (pTable == NULL || row == NULL) return -1;
pMeta = sdbGetRowMeta(handle, row); pMeta = sdbGetRowMeta(handle, row);
if (pMeta == NULL) { if (pMeta == NULL) {
sdbTrace("table:%s, record is not there, update failed", pTable->name); switch (pTable->keyType) {
case SDB_KEYTYPE_STRING:
sdbError("table:%s, failed to update record:%s, record is not there, sdbVersion:%ld id:%d",
pTable->name, (char *) row, sdbVersion, pTable->id);
break;
case SDB_KEYTYPE_UINT32: //dnodes or mnodes
sdbError("table:%s, failed to update record:%s record is not there, sdbVersion:%ld id:%d",
pTable->name, taosIpStr(*(int32_t *) row), sdbVersion, pTable->id);
break;
case SDB_KEYTYPE_AUTO:
sdbError("table:%s, failed to update record:F%s record is not there, sdbVersion:%ld id:%d",
pTable->name, *(int32_t *) row, sdbVersion, pTable->id);
break;
default:
sdbError("table:%s, failed to update record:%s record is not there, sdbVersion:%ld id:%d",
pTable->name, sdbVersion, pTable->id);
break;
}
return -1; return -1;
} }
...@@ -623,7 +665,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) { ...@@ -623,7 +665,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows); pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows);
break; break;
case SDB_KEYTYPE_UINT32: //dnodes or mnodes case SDB_KEYTYPE_UINT32: //dnodes or mnodes
sdbTrace("table:%s, a record is updated:%d, sdbVersion:%ld id:%ld numOfRows:%d", sdbTrace("table:%s, a record is updated:%s, sdbVersion:%ld id:%ld numOfRows:%d",
pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, pTable->id, pTable->numOfRows); pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, pTable->id, pTable->numOfRows);
break; break;
case SDB_KEYTYPE_AUTO: case SDB_KEYTYPE_AUTO:
......
...@@ -353,7 +353,7 @@ bool vnodeIsValidVnodeCfg(SVnodeCfg *pCfg); ...@@ -353,7 +353,7 @@ bool vnodeIsValidVnodeCfg(SVnodeCfg *pCfg);
int32_t vnodeGetResultSize(void *handle, int32_t *numOfRows); int32_t vnodeGetResultSize(void *handle, int32_t *numOfRows);
int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, int32_t *size); int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows);
int64_t vnodeGetOffsetVal(void *thandle); int64_t vnodeGetOffsetVal(void *thandle);
......
...@@ -61,6 +61,20 @@ int main(int argc, char *argv[]) { ...@@ -61,6 +61,20 @@ int main(int argc, char *argv[]) {
return 0; return 0;
} else if (strcmp(argv[i], "-k") == 0) { } else if (strcmp(argv[i], "-k") == 0) {
dnodeParseParameterK(); dnodeParseParameterK();
#ifdef TAOS_MEM_CHECK
} else if (strcmp(argv[i], "--alloc-random-fail") == 0) {
if ((i < argc - 1) && (argv[i+1][0] != '-')) {
taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, argv[++i], true);
} else {
taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, NULL, true);
}
} else if (strcmp(argv[i], "--detect-mem-leak") == 0) {
if ((i < argc - 1) && (argv[i+1][0] != '-')) {
taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, argv[++i], true);
} else {
taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, NULL, true);
}
#endif
} }
} }
......
...@@ -519,10 +519,8 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) { ...@@ -519,10 +519,8 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
pMeter = mgmtGetMeter(pCreate->meterId); pMeter = mgmtGetMeter(pCreate->meterId);
if (pMeter) { if (pMeter) {
if (pCreate->igExists) { if (pCreate->igExists) {
mError("table:%s, igExists is true", pCreate->meterId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
mError("table:%s, table is already exist", pCreate->meterId);
return TSDB_CODE_TABLE_ALREADY_EXIST; return TSDB_CODE_TABLE_ALREADY_EXIST;
} }
} }
......
...@@ -6943,37 +6943,19 @@ static int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **p ...@@ -6943,37 +6943,19 @@ static int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **p
return numOfRes; return numOfRes;
} }
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int32_t *size) { static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) {
SMeterObj *pObj = pQInfo->pObj; SMeterObj *pObj = pQInfo->pObj;
SQuery * pQuery = &pQInfo->query; SQuery * pQuery = &pQInfo->query;
int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock; int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock;
int32_t dataSize = pQInfo->query.rowSize * numOfRows;
if (dataSize >= tsCompressMsgSize && tsCompressMsgSize > 0) {
char *compBuf = malloc((size_t)dataSize);
// for metric query, bufIndex always be 0. // for metric query, bufIndex always be 0.
char *d = compBuf;
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0
int32_t bytes = pQuery->pSelectExpr[col].resBytes;
memmove(d, pQuery->sdata[col]->data + bytes * tnumOfRows * pQInfo->bufIndex, bytes * numOfRows);
d += bytes * numOfRows;
}
*size = tsCompressString(compBuf, dataSize, 1, data, dataSize + EXTRA_BYTES, 0, 0, 0);
dTrace("QInfo:%p compress rsp msg, before:%d, after:%d", pQInfo, dataSize, *size);
free(compBuf);
} else { // for metric query, bufIndex always be 0.
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0 for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0
int32_t bytes = pQuery->pSelectExpr[col].resBytes; int32_t bytes = pQuery->pSelectExpr[col].resBytes;
memmove(data, pQuery->sdata[col]->data + bytes * tnumOfRows * pQInfo->bufIndex, bytes * numOfRows); memmove(data, pQuery->sdata[col]->data + bytes * tnumOfRows * pQInfo->bufIndex, bytes * numOfRows);
data += bytes * numOfRows; data += bytes * numOfRows;
} }
}
} }
/** /**
...@@ -6987,7 +6969,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -6987,7 +6969,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
* @param numOfRows the number of rows that are not returned in current retrieve * @param numOfRows the number of rows that are not returned in current retrieve
* @return * @return
*/ */
int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, int32_t *size) { int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows) {
SQInfo *pQInfo = (SQInfo *)handle; SQInfo *pQInfo = (SQInfo *)handle;
SQuery *pQuery = &pQInfo->query; SQuery *pQuery = &pQInfo->query;
...@@ -7000,7 +6982,7 @@ int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, i ...@@ -7000,7 +6982,7 @@ int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, i
// make sure file exist // make sure file exist
if (VALIDFD(fd)) { if (VALIDFD(fd)) {
size_t s = lseek(fd, 0, SEEK_END); size_t s = lseek(fd, 0, SEEK_END);
dTrace("QInfo:%p ts comp data return, file:%s, size:%ld", pQInfo, pQuery->sdata[0]->data, size); dTrace("QInfo:%p ts comp data return, file:%s, size:%lld", pQInfo, pQuery->sdata[0]->data, s);
lseek(fd, 0, SEEK_SET); lseek(fd, 0, SEEK_SET);
read(fd, data, s); read(fd, data, s);
...@@ -7012,7 +6994,7 @@ int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, i ...@@ -7012,7 +6994,7 @@ int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, i
pQuery->sdata[0]->data, strerror(errno)); pQuery->sdata[0]->data, strerror(errno));
} }
} else { } else {
doCopyQueryResultToMsg(pQInfo, numOfRows, data, size); doCopyQueryResultToMsg(pQInfo, numOfRows, data);
} }
return numOfRows; return numOfRows;
......
...@@ -850,7 +850,7 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) { ...@@ -850,7 +850,7 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
// the remained number of retrieved rows, not the interpolated result // the remained number of retrieved rows, not the interpolated result
int numOfRows = pQInfo->pointsRead - pQInfo->pointsReturned; int numOfRows = pQInfo->pointsRead - pQInfo->pointsReturned;
int32_t numOfFinal = vnodeCopyQueryResultToMsg(pQInfo, data, numOfRows, size); int32_t numOfFinal = vnodeCopyQueryResultToMsg(pQInfo, data, numOfRows);
pQInfo->pointsReturned += numOfFinal; pQInfo->pointsReturned += numOfFinal;
dTrace("QInfo:%p %d are returned, totalReturned:%d totalRead:%d", pQInfo, numOfFinal, pQInfo->pointsReturned, dTrace("QInfo:%p %d are returned, totalReturned:%d totalRead:%d", pQInfo, numOfFinal, pQInfo->pointsReturned,
...@@ -862,12 +862,9 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) { ...@@ -862,12 +862,9 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo); uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo);
/* /*
* If SQInfo has been released, the value of signature cannot be equalled to * If SQInfo has been released, the value of signature cannot be equalled to the address of pQInfo,
* the address of pQInfo, since in release function, the original value has * since in release function, the original value has been destroyed. However, this memory area may be reused
* been * by another function. It may be 0 or any value, but it is rarely still be equalled to the address of SQInfo.
* destroyed. However, this memory area may be reused by another function.
* It may be 0 or any value, but it is rarely still be equalled to the address
* of SQInfo.
*/ */
if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) { if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) {
dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature); dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature);
......
...@@ -452,11 +452,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { ...@@ -452,11 +452,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
pMsg = pRsp->data; pMsg = pRsp->data;
if (numOfRows > 0 && code == TSDB_CODE_SUCCESS) { if (numOfRows > 0 && code == TSDB_CODE_SUCCESS) {
int32_t oldSize = size;
vnodeSaveQueryResult((void *)(pRetrieve->qhandle), pRsp->data, &size); vnodeSaveQueryResult((void *)(pRetrieve->qhandle), pRsp->data, &size);
if (oldSize > size) {
pRsp->compress = htons(1); // denote that the response msg is compressed
}
} }
pMsg += size; pMsg += size;
...@@ -573,6 +569,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -573,6 +569,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
int sversion = htonl(pBlocks->sversion); int sversion = htonl(pBlocks->sversion);
if (pSubmit->import) { if (pSubmit->import) {
dTrace("start to import data");
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj, code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints, now); sversion, &numOfPoints, now);
} else { } else {
......
...@@ -16,63 +16,462 @@ ...@@ -16,63 +16,462 @@
#include "os.h" #include "os.h"
#include "tlog.h" #include "tlog.h"
#define TAOS_MEM_CHECK_IMPL
#include "tutil.h"
#ifdef TAOS_MEM_CHECK
static int allocMode = TAOS_ALLOC_MODE_DEFAULT;
static FILE* fpAllocLog = NULL;
////////////////////////////////////////////////////////////////////////////////
// memory allocator which fails randomly
extern int32_t taosGetTimestampSec(); extern int32_t taosGetTimestampSec();
static int32_t startTime = 0; static int32_t startTime = INT32_MAX;;
static int64_t m_curLimit = 100*1024;
bool isMallocMem(unsigned int size, char* _func) { static bool random_alloc_fail(size_t size, const char* file, uint32_t line) {
if (0 == startTime) { if (taosGetTimestampSec() < startTime) {
startTime = taosGetTimestampSec(); return false;
return true; }
} else {
int32_t currentTime = taosGetTimestampSec(); if (size < 100 * (size_t)1024) {
if (currentTime - startTime < 10) return true; return false;
} }
if (size > m_curLimit) { if (rand() % 20 != 0) {
if (3 == rand() % 20) {
pTrace("====no alloc mem in func: %s, size:%d", _func, size);
return false; return false;
} }
if (fpAllocLog != NULL) {
fprintf(fpAllocLog, "%s:%d: memory allocation of %zu bytes will fail.\n", file, line, size);
} }
return true; return true;
} }
void* taos_malloc(unsigned int size, char* _func) { static void* malloc_random(size_t size, const char* file, uint32_t line) {
return random_alloc_fail(size, file, line) ? NULL : malloc(size);
}
if (false == isMallocMem(size, _func)) { static void* calloc_random(size_t num, size_t size, const char* file, uint32_t line) {
return NULL; return random_alloc_fail(num * size, file, line) ? NULL : calloc(num, size);
}
static void* realloc_random(void* ptr, size_t size, const char* file, uint32_t line) {
return random_alloc_fail(size, file, line) ? NULL : realloc(ptr, size);
}
static char* strdup_random(const char* str, const char* file, uint32_t line) {
size_t len = strlen(str);
return random_alloc_fail(len + 1, file, line) ? NULL : strdup(str);
}
static char* strndup_random(const char* str, size_t size, const char* file, uint32_t line) {
size_t len = strlen(str);
if (len > size) {
len = size;
} }
return random_alloc_fail(len + 1, file, line) ? NULL : strndup(str, len);
}
void *p = NULL; static ssize_t getline_random(char **lineptr, size_t *n, FILE *stream, const char* file, uint32_t line) {
p = malloc(size); return random_alloc_fail(*n, file, line) ? -1 : getline(lineptr, n, stream);
return p; }
////////////////////////////////////////////////////////////////////////////////
// memory allocator with leak detection
#define MEMBLK_MAGIC 0x55AA
typedef struct SMemBlock {
const char* file;
uint16_t line;
uint16_t magic;
uint32_t size;
struct SMemBlock* prev;
struct SMemBlock* next;
// TODO: need pading in 32bit platform
char data[0];
} SMemBlock;
static SMemBlock *blocks = NULL;
static uintptr_t lock = 0;
static void add_mem_block(SMemBlock* blk) {
blk->prev = NULL;
while (atomic_val_compare_exchange_ptr(&lock, 0, 1) != 0);
blk->next = blocks;
if (blocks != NULL) {
blocks->prev = blk;
}
blocks = blk;
atomic_store_ptr(&lock, 0);
}
static void remove_mem_block(SMemBlock* blk) {
while (atomic_val_compare_exchange_ptr(&lock, 0, 1) != 0);
if (blocks == blk) {
blocks = blk->next;
}
if (blk->prev != NULL) {
blk->prev->next = blk->next;
}
if (blk->next != NULL) {
blk->next->prev = blk->prev;
}
atomic_store_ptr(&lock, 0);
blk->prev = NULL;
blk->next = NULL;
}
static void free_detect_leak(void* ptr, const char* file, uint32_t line) {
if (ptr == NULL) {
return;
}
SMemBlock* blk = (SMemBlock*)(((char*)ptr) - sizeof(SMemBlock));
if (blk->magic != MEMBLK_MAGIC) {
if (fpAllocLog != NULL) {
fprintf(fpAllocLog, "%s:%d: memory is allocated by default allocator.\n", file, line);
}
free(ptr);
return;
}
remove_mem_block(blk);
free(blk);
} }
void* taos_calloc(unsigned int num, unsigned int size, char* _func) { static void* malloc_detect_leak(size_t size, const char* file, uint32_t line) {
if (size == 0) {
return NULL;
}
if (false == isMallocMem(size, _func)) { SMemBlock *blk = (SMemBlock*)malloc(size + sizeof(SMemBlock));
if (blk == NULL) {
return NULL; return NULL;
} }
void *p = NULL; if (line > UINT16_MAX && fpAllocLog != NULL) {
p = calloc(num, size); fprintf(fpAllocLog, "%s:%d: line number too large.\n", file, line);
}
if (size > UINT32_MAX && fpAllocLog != NULL) {
fprintf(fpAllocLog, "%s:%d: size too large: %zu.\n", file, line, size);
}
blk->file = file;
blk->line = (uint16_t)line;
blk->magic = MEMBLK_MAGIC;
blk->size = size;
add_mem_block(blk);
return blk->data;
}
static void* calloc_detect_leak(size_t num, size_t size, const char* file, uint32_t line) {
size *= num;
void* p = malloc_detect_leak(size, file, line);
if (p != NULL) {
memset(p, 0, size);
}
return p; return p;
} }
void* taos_realloc(void* ptr, unsigned int size, char* _func) { static void* realloc_detect_leak(void* ptr, size_t size, const char* file, uint32_t line) {
if (size == 0) {
free_detect_leak(ptr, file, line);
return NULL;
}
if (ptr == NULL) {
return malloc_detect_leak(size, file, line);
}
if (false == isMallocMem(size, _func)) { SMemBlock* blk = ((char*)ptr) - sizeof(SMemBlock);
if (blk->magic != MEMBLK_MAGIC) {
if (fpAllocLog != NULL) {
fprintf(fpAllocLog, "%s:%d: memory is allocated by default allocator.\n", file, line);
}
return realloc(ptr, size);
}
remove_mem_block(blk);
void* p = realloc(blk, size + sizeof(SMemBlock));
if (p == NULL) {
add_mem_block(blk);
return NULL; return NULL;
} }
void *p = NULL; if (size > UINT32_MAX && fpAllocLog != NULL) {
p = realloc(ptr, size); fprintf(fpAllocLog, "%s:%d: size too large: %zu.\n", file, line, size);
}
blk = (SMemBlock*)p;
blk->size = size;
add_mem_block(blk);
return blk->data;
}
static char* strdup_detect_leak(const char* str, const char* file, uint32_t line) {
size_t len = strlen(str);
char *p = malloc_detect_leak(len + 1, file, line);
if (p != NULL) {
memcpy(p, str, len);
p[len] = 0;
}
return p; return p;
} }
void taos_free(void* ptr) { static char* strndup_detect_leak(const char* str, size_t size, const char* file, uint32_t line) {
free(ptr); size_t len = strlen(str);
if (len > size) {
len = size;
}
char *p = malloc_detect_leak(len + 1, file, line);
if (p != NULL) {
memcpy(p, str, len);
p[len] = 0;
}
return p;
}
static ssize_t getline_detect_leak(char **lineptr, size_t *n, FILE *stream, const char* file, uint32_t line) {
char* buf = NULL;
size_t bufSize = 0;
ssize_t size = getline(&buf, &bufSize, stream);
if (size != -1) {
if (*n < size + 1) {
void* p = realloc_detect_leak(*lineptr, size + 1, file, line);
if (p == NULL) {
free(buf);
return -1;
}
*lineptr = (char*)p;
*n = size + 1;
}
memcpy(*lineptr, buf, size + 1);
}
free(buf);
return size;
}
static void dump_memory_leak() {
const char* hex = "0123456789ABCDEF";
const char* fmt = ":%d: addr=%p, size=%d, content(first 16 bytes)=";
size_t numOfBlk = 0, totalSize = 0;
if (fpAllocLog == NULL) {
return;
}
fputs("memory blocks allocated but not freed before exit:\n", fpAllocLog);
while (atomic_val_compare_exchange_ptr(&lock, 0, 1) != 0);
for (SMemBlock* blk = blocks; blk != NULL; blk = blk->next) {
++numOfBlk;
totalSize += blk->size;
fputs(blk->file, fpAllocLog);
fprintf(fpAllocLog, fmt, blk->line, blk->data, blk->size);
char sep = '\'';
size_t size = blk->size > 16 ? 16 : blk->size;
for (size_t i = 0; i < size; ++i) {
uint8_t c = (uint8_t)(blk->data[i]);
fputc(sep, fpAllocLog);
sep = ' ';
fputc(hex[c >> 4], fpAllocLog);
fputc(hex[c & 0x0f], fpAllocLog);
}
fputs("'\n", fpAllocLog);
}
atomic_store_ptr(&lock, 0);
fprintf(fpAllocLog, "\nnumber of blocks: %zu, total bytes: %zu\n", numOfBlk, totalSize);
fflush(fpAllocLog);
}
static void dump_memory_leak_on_sig(int sig) {
fprintf(fpAllocLog, "signal %d received.\n", sig);
// restore default signal handler
struct sigaction act = {0};
act.sa_handler = SIG_DFL;
sigaction(sig, &act, NULL);
dump_memory_leak();
}
////////////////////////////////////////////////////////////////////////////////
// interface functions
void* taos_malloc(size_t size, const char* file, uint32_t line) {
switch (allocMode) {
case TAOS_ALLOC_MODE_DEFAULT:
return malloc(size);
case TAOS_ALLOC_MODE_RANDOM_FAIL:
return malloc_random(size, file, line);
case TAOS_ALLOC_MODE_DETECT_LEAK:
return malloc_detect_leak(size, file, line);
}
return malloc(size);
}
void* taos_calloc(size_t num, size_t size, const char* file, uint32_t line) {
switch (allocMode) {
case TAOS_ALLOC_MODE_DEFAULT:
return calloc(num, size);
case TAOS_ALLOC_MODE_RANDOM_FAIL:
return calloc_random(num, size, file, line);
case TAOS_ALLOC_MODE_DETECT_LEAK:
return calloc_detect_leak(num, size, file, line);
}
return calloc(num, size);
}
void* taos_realloc(void* ptr, size_t size, const char* file, uint32_t line) {
switch (allocMode) {
case TAOS_ALLOC_MODE_DEFAULT:
return realloc(ptr, size);
case TAOS_ALLOC_MODE_RANDOM_FAIL:
return realloc_random(ptr, size, file, line);
case TAOS_ALLOC_MODE_DETECT_LEAK:
return realloc_detect_leak(ptr, size, file, line);
}
return realloc(ptr, size);
}
void taos_free(void* ptr, const char* file, uint32_t line) {
switch (allocMode) {
case TAOS_ALLOC_MODE_DEFAULT:
return free(ptr);
case TAOS_ALLOC_MODE_RANDOM_FAIL:
return free(ptr);
case TAOS_ALLOC_MODE_DETECT_LEAK:
return free_detect_leak(ptr, file, line);
}
return free(ptr);
}
char* taos_strdup(const char* str, const char* file, uint32_t line) {
switch (allocMode) {
case TAOS_ALLOC_MODE_DEFAULT:
return strdup(str);
case TAOS_ALLOC_MODE_RANDOM_FAIL:
return strdup_random(str, file, line);
case TAOS_ALLOC_MODE_DETECT_LEAK:
return strdup_detect_leak(str, file, line);
}
return strdup(str);
}
char* taos_strndup(const char* str, size_t size, const char* file, uint32_t line) {
switch (allocMode) {
case TAOS_ALLOC_MODE_DEFAULT:
return strndup(str, size);
case TAOS_ALLOC_MODE_RANDOM_FAIL:
return strndup_random(str, size, file, line);
case TAOS_ALLOC_MODE_DETECT_LEAK:
return strndup_detect_leak(str, size, file, line);
}
return strndup(str, size);
}
ssize_t taos_getline(char **lineptr, size_t *n, FILE *stream, const char* file, uint32_t line) {
switch (allocMode) {
case TAOS_ALLOC_MODE_DEFAULT:
return getline(lineptr, n, stream);
case TAOS_ALLOC_MODE_RANDOM_FAIL:
return getline_random(lineptr, n, stream, file, line);
case TAOS_ALLOC_MODE_DETECT_LEAK:
return getline_detect_leak(lineptr, n, stream, file, line);
}
return getline(lineptr, n, stream);
}
static void close_alloc_log() {
if (fpAllocLog != NULL) {
if (fpAllocLog != stdout) {
fclose(fpAllocLog);
}
fpAllocLog = NULL;
}
}
void taosSetAllocMode(int mode, const char* path, bool autoDump) {
assert(mode >= TAOS_ALLOC_MODE_DEFAULT);
assert(mode <= TAOS_ALLOC_MODE_DETECT_LEAK);
if (fpAllocLog != NULL || allocMode != TAOS_ALLOC_MODE_DEFAULT) {
printf("memory allocation mode can only be set once.\n");
return;
}
if (path == NULL || path[0] == 0) {
fpAllocLog = stdout;
} else if ((fpAllocLog = fopen(path, "w")) != NULL) {
atexit(close_alloc_log);
} else {
printf("failed to open memory allocation log file '%s', errno=%d\n", path, errno);
return;
}
allocMode = mode;
if (mode == TAOS_ALLOC_MODE_RANDOM_FAIL) {
startTime = taosGetTimestampSec() + 10;
return;
}
if (autoDump && mode == TAOS_ALLOC_MODE_DETECT_LEAK) {
atexit(dump_memory_leak);
struct sigaction act = {0};
act.sa_handler = dump_memory_leak_on_sig;
sigaction(SIGFPE, &act, NULL);
sigaction(SIGSEGV, &act, NULL);
sigaction(SIGILL, &act, NULL);
}
}
void taosDumpMemoryLeak() {
dump_memory_leak();
close_alloc_log();
}
#else // 'TAOS_MEM_CHECK' not defined
void taosSetAllocMode(int mode, const char* path, bool autoDump) {
// do nothing
}
void taosDumpMemoryLeak() {
// do nothing
} }
#endif // TAOS_MEM_CHECK
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册