提交 3c8b6eed 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-13066-3.0

......@@ -226,10 +226,10 @@ endif(${BUILD_WITH_NURAFT})
if(${BUILD_PTHREAD})
set(CMAKE_BUILD_TYPE release)
add_definitions(-DPTW32_STATIC_LIB)
add_subdirectory(pthread)
add_subdirectory(pthread EXCLUDE_FROM_ALL)
set_target_properties(libpthreadVC3 PROPERTIES OUTPUT_NAME pthread)
add_library(pthread STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET pthread PROPERTY IMPORTED_LOCATION ${LIBRARY_OUTPUT_PATH}/pthread.lib)
add_library(pthread INTERFACE)
target_link_libraries(pthread INTERFACE libpthreadVC3)
endif()
# iconv
......
......@@ -29,27 +29,42 @@ extern "C" {
typedef struct SSchema SSchema;
typedef struct STColumn STColumn;
typedef struct STSchema STSchema;
typedef struct SColVal SColVal;
typedef struct STSRow2 STSRow2;
typedef struct STSRowBuilder STSRowBuilder;
typedef struct SKVIdx SKVIdx;
// STSchema
// STSRow2
int32_t tEncodeTSRow(SEncoder *pEncoder, const STSRow2 *pRow);
int32_t tDecodeTSRow(SDecoder *pDecoder, STSRow2 *pRow);
typedef struct STagVal STagVal;
typedef struct STag STag;
// STSchema
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema);
void tTSchemaDestroy(STSchema *pTSchema);
// SColVal
#define ColValNONE ((SColVal){.type = COL_VAL_NONE, .nData = 0, .pData = NULL})
#define ColValNULL ((SColVal){.type = COL_VAL_NULL, .nData = 0, .pData = NULL})
#define ColValDATA(nData, pData) ((SColVal){.type = COL_VAL_DATA, .nData = (nData), .pData = (pData)})
// STSRow2
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow);
int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow);
int32_t tTSRowDup(const STSRow2 *pRow, STSRow2 **ppRow);
void tTSRowFree(STSRow2 *pRow);
int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
// STSRowBuilder
int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, SSchema *pSchema, int32_t nCols);
int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, int32_t nCols, SSchema *pSchema);
void tTSRowBuilderClear(STSRowBuilder *pBuilder);
void tTSRowBuilderReset(STSRowBuilder *pBuilder);
int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pData, uint32_t nData);
int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, uint32_t nData);
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
// STag
int32_t tTagNew(STagVal *pTagVals, int16_t nTag, STag **ppTag);
void tTagFree(STag *pTag);
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, int32_t *nData);
int32_t tEncodeTag(SEncoder *pEncoder, STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, const STag **ppTag);
// STRUCT =================
struct STColumn {
col_id_t colId;
......@@ -68,31 +83,47 @@ struct STSchema {
STColumn columns[];
};
#define TSROW_HAS_NONE ((uint8_t)0x1)
#define TSROW_HAS_NULL ((uint8_t)0x2U)
#define TSROW_HAS_VAL ((uint8_t)0x4U)
#define TSROW_KV_ROW ((uint8_t)0x10U)
struct STSRow2 {
TSKEY ts;
uint32_t flags;
union {
int32_t sver;
int32_t ncols;
};
uint32_t nData;
const uint8_t *pData;
uint8_t flags;
int32_t sver;
uint32_t nData;
uint8_t *pData;
};
struct STSRowBuilder {
STColumn *pTColumn;
STSchema *pTSchema;
int32_t szBitMap1;
int32_t szBitMap2;
int32_t szKVBuf;
uint8_t *pKVBuf;
int32_t szTPBuf;
uint8_t *pTPBuf;
int32_t nCols;
int32_t kvVLen;
int32_t tpVLen;
int32_t iCol;
int32_t vlenKV;
int32_t vlenTP;
STSRow2 row;
};
#if 1 //====================================
typedef enum { COL_VAL_NONE = 0, COL_VAL_NULL = 1, COL_VAL_DATA = 2 } EColValT;
struct SColVal {
EColValT type;
uint32_t nData;
uint8_t *pData;
};
struct STagVal {
int16_t cid;
int8_t type;
uint32_t nData;
uint8_t *pData;
};
#if 1 //================================================================================================================================================
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
#define TD_SUPPORT_BITMAP
#define TD_SUPPORT_READ2
......
......@@ -258,6 +258,7 @@ typedef struct {
char* tblFName;
int32_t numOfRows;
int32_t affectedRows;
int64_t sver;
} SSubmitBlkRsp;
typedef struct {
......@@ -274,10 +275,10 @@ int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_SMA_ON ((int8_t)0x1)
#define COL_IDX_ON ((int8_t)0x2)
#define COL_VAL_SET ((int8_t)0x4)
#define COL_SMA_ON ((int8_t)0x1)
#define COL_IDX_ON ((int8_t)0x2)
#define COL_SET_NULL ((int8_t)0x10)
#define COL_SET_VAL ((int8_t)0x20)
typedef struct SSchema {
int8_t type;
int8_t flags;
......@@ -286,6 +287,9 @@ typedef struct SSchema {
char name[TSDB_COL_NAME_LEN];
} SSchema;
#define COL_IS_SET(FLG) ((FLG) & (COL_SET_VAL | COL_SET_NULL) != 0)
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
#define SSCHMEA_TYPE(s) ((s)->type)
......
......@@ -41,8 +41,8 @@ typedef enum {
typedef int32_t (*PutToQueueFp)(void* pMgmt, SRpcMsg* pMsg);
typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg);
typedef void (*SendRspFp)(const SRpcMsg* pMsg);
typedef void (*SendRedirectRspFp)(const SRpcMsg* pMsg, const SEpSet* pNewEpSet);
typedef void (*SendRspFp)(SRpcMsg* pMsg);
typedef void (*SendRedirectRspFp)(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
typedef void (*RegisterBrokenLinkArgFp)(SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type);
typedef void (*ReportStartup)(const char* name, const char* desc);
......@@ -64,8 +64,8 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
void tmsgSendRsp(const SRpcMsg* pMsg);
void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet);
void tmsgSendRsp(SRpcMsg* pMsg);
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);
void tmsgReportStartup(const char* name, const char* desc);
......
......@@ -456,52 +456,189 @@ static FORCE_INLINE void* tDecoderMalloc(SDecoder* pCoder, int32_t size) {
return p;
}
static FORCE_INLINE int32_t tPutBinary(uint8_t* p, const uint8_t* pData, uint32_t nData) {
int n = 0;
uint32_t v = nData;
for (;;) {
if (v <= 0x7f) {
if (p) p[n] = v;
n++;
break;
}
if (p) p[n] = (v & 0x7f) | 0x80;
n++;
v >>= 7;
}
// ===========================================
#define tPutV(p, v) \
do { \
int32_t n = 0; \
for (;;) { \
if (v <= 0x7f) { \
if (p) p[n] = v; \
n++; \
break; \
} \
if (p) p[n] = (v & 0x7f) | 0x80; \
n++; \
v >>= 7; \
} \
return n; \
} while (0)
if (p) {
memcpy(p + n, pData, nData);
}
#define tGetV(p, v) \
do { \
int32_t n = 0; \
if (v) *v = 0; \
for (;;) { \
if (p[n] <= 0x7f) { \
if (v) (*v) |= (p[n] << (7 * n)); \
n++; \
break; \
} \
if (v) (*v) |= ((p[n] & 0x7f) << (7 * n)); \
n++; \
} \
return n; \
} while (0)
// PUT
static FORCE_INLINE int32_t tPutU8(uint8_t* p, uint8_t v) {
if (p) ((uint8_t*)p)[0] = v;
return sizeof(uint8_t);
}
static FORCE_INLINE int32_t tPutI8(uint8_t* p, int8_t v) {
if (p) ((int8_t*)p)[0] = v;
return sizeof(int8_t);
}
static FORCE_INLINE int32_t tPutU16(uint8_t* p, uint16_t v) {
if (p) ((uint16_t*)p)[0] = v;
return sizeof(uint16_t);
}
static FORCE_INLINE int32_t tPutI16(uint8_t* p, int16_t v) {
if (p) ((int16_t*)p)[0] = v;
return sizeof(int16_t);
}
static FORCE_INLINE int32_t tPutU32(uint8_t* p, uint32_t v) {
if (p) ((uint32_t*)p)[0] = v;
return sizeof(uint32_t);
}
static FORCE_INLINE int32_t tPutI32(uint8_t* p, int32_t v) {
if (p) ((int32_t*)p)[0] = v;
return sizeof(int32_t);
}
static FORCE_INLINE int32_t tPutU64(uint8_t* p, uint64_t v) {
if (p) ((uint64_t*)p)[0] = v;
return sizeof(uint64_t);
}
static FORCE_INLINE int32_t tPutI64(uint8_t* p, int64_t v) {
if (p) ((int64_t*)p)[0] = v;
return sizeof(int64_t);
}
static FORCE_INLINE int32_t tPutU16v(uint8_t* p, uint16_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutI16v(uint8_t* p, int16_t v) { return tPutU16v(p, ZIGZAGE(int16_t, v)); }
static FORCE_INLINE int32_t tPutU32v(uint8_t* p, uint32_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutI32v(uint8_t* p, int32_t v) { return tPutU32v(p, ZIGZAGE(int32_t, v)); }
static FORCE_INLINE int32_t tPutU64v(uint8_t* p, uint64_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutI64v(uint8_t* p, int64_t v) { return tPutU64v(p, ZIGZAGE(int64_t, v)); }
// GET
static FORCE_INLINE int32_t tGetU8(uint8_t* p, uint8_t* v) {
if (v) *v = ((uint8_t*)p)[0];
return sizeof(uint8_t);
}
static FORCE_INLINE int32_t tGetI8(uint8_t* p, int8_t* v) {
if (v) *v = ((int8_t*)p)[0];
return sizeof(int8_t);
}
static FORCE_INLINE int32_t tGetU16(uint8_t* p, uint16_t* v) {
if (v) *v = ((uint16_t*)p)[0];
return sizeof(uint16_t);
}
static FORCE_INLINE int32_t tGetI16(uint8_t* p, int16_t* v) {
if (v) *v = ((int16_t*)p)[0];
return sizeof(int16_t);
}
static FORCE_INLINE int32_t tGetU32(uint8_t* p, uint32_t* v) {
if (v) *v = ((uint32_t*)p)[0];
return sizeof(uint32_t);
}
static FORCE_INLINE int32_t tGetI32(uint8_t* p, int32_t* v) {
if (v) *v = ((int32_t*)p)[0];
return sizeof(int32_t);
}
static FORCE_INLINE int32_t tGetU64(uint8_t* p, uint64_t* v) {
if (v) *v = ((uint64_t*)p)[0];
return sizeof(uint64_t);
}
static FORCE_INLINE int32_t tGetI64(uint8_t* p, int64_t* v) {
if (v) *v = ((int64_t*)p)[0];
return sizeof(int64_t);
}
static FORCE_INLINE int32_t tGetU16v(uint8_t* p, uint16_t* v) { tGetV(p, v); }
static FORCE_INLINE int32_t tGetI16v(uint8_t* p, int16_t* v) {
int32_t n;
uint16_t tv;
n = tGetU16v(p, &tv);
if (v) *v = ZIGZAGD(int16_t, tv);
return n;
}
static FORCE_INLINE int32_t tGetU32v(uint8_t* p, uint32_t* v) { tGetV(p, v); }
static FORCE_INLINE int32_t tGetI32v(uint8_t* p, int32_t* v) {
int32_t n;
uint32_t tv;
n = tGetU32v(p, &tv);
if (v) *v = ZIGZAGD(int32_t, tv);
return n;
}
static FORCE_INLINE int32_t tGetU64v(uint8_t* p, uint64_t* v) { tGetV(p, v); }
static FORCE_INLINE int32_t tGetI64v(uint8_t* p, int64_t* v) {
int32_t n;
uint64_t tv;
n = tGetU64v(p, &tv);
if (v) *v = ZIGZAGD(int64_t, tv);
return n;
}
// =====================
static FORCE_INLINE int32_t tPutBinary(uint8_t* p, uint8_t* pData, uint32_t nData) {
int n = 0;
n += tPutU32v(p ? p + n : p, nData);
if (p) memcpy(p + n, pData, nData);
n += nData;
return n;
}
static FORCE_INLINE int32_t tGetBinary(const uint8_t* p, const uint8_t** ppData, uint32_t* nData) {
static FORCE_INLINE int32_t tGetBinary(uint8_t* p, uint8_t** ppData, uint32_t* nData) {
int32_t n = 0;
uint32_t tv = 0;
uint32_t t;
for (;;) {
if (p[n] <= 0x7f) {
t = p[n];
tv |= (t << (7 * n));
n++;
break;
}
t = p[n] & 0x7f;
tv |= (t << (7 * n));
n++;
}
uint32_t nt;
if (nData) *nData = n;
n += tGetU32v(p, &nt);
if (nData) *nData = nt;
if (ppData) *ppData = p + n;
n += nt;
n += tv;
return n;
}
......
......@@ -714,25 +714,31 @@ static bool smlIsNchar(const char *pVal, uint16_t len) {
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
char *endPtr = NULL;
double ts = (double)strtoll(value, &endPtr, 10);
int64_t tsInt64 = strtoll(value, &endPtr, 10);
if(value + len != endPtr){
return -1;
}
double ts = tsInt64;
switch (type) {
case TSDB_TIME_PRECISION_HOURS:
ts *= (3600 * 1e9);
tsInt64 *= (3600 * 1e9);
break;
case TSDB_TIME_PRECISION_MINUTES:
ts *= (60 * 1e9);
tsInt64 *= (60 * 1e9);
break;
case TSDB_TIME_PRECISION_SECONDS:
ts *= (1e9);
tsInt64 *= (1e9);
break;
case TSDB_TIME_PRECISION_MILLI:
ts *= (1e6);
tsInt64 *= (1e6);
break;
case TSDB_TIME_PRECISION_MICRO:
ts *= (1e3);
tsInt64 *= (1e3);
break;
case TSDB_TIME_PRECISION_NANO:
break;
......@@ -743,7 +749,7 @@ static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
return -1;
}
return (int64_t)ts;
return tsInt64;
}
static int64_t smlGetTimeNow(int8_t precision) {
......
......@@ -260,8 +260,8 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
if (STMT_TYPE_QUERY != pStmt->sql.type || freeRequest) {
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
}
......@@ -280,7 +280,11 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
continue;
}
qFreeStmtDataBlock(pBlocks);
if (STMT_TYPE_MULTI_INSERT == pStmt->sql.type) {
qFreeStmtDataBlock(pBlocks);
} else {
qDestroyStmtDataBlock(pBlocks);
}
taosHashRemove(pStmt->exec.pBlockHash, key, keyLen);
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
......@@ -320,11 +324,11 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
taosHashCleanup(pStmt->sql.pTableCache);
pStmt->sql.pTableCache = NULL;
memset(&pStmt->sql, 0, sizeof(pStmt->sql));
STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
memset(&pStmt->sql, 0, sizeof(pStmt->sql));
return TSDB_CODE_SUCCESS;
}
......
......@@ -1188,3 +1188,36 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) {
destroyRequest(request);
smlDestroyInfo(info);
}
TEST(testCase, sml_TD15662_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES *pRes = taos_query(taos, "create database if not exists db_15662 precision 'ns'");
taos_free_result(pRes);
pRes = taos_query(taos, "use db_15662");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql[] = {
"iyyyje,id=iyyyje_41943_1303,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=false,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000",
};
int ret = smlProcess(info, (char **)sql, sizeof(sql) / sizeof(sql[0]));
ASSERT_EQ(ret, 0);
// case 1
TAOS_RES *res = taos_query(taos, "select * from t_a5615048edae55218a22a149edebdc82");
ASSERT_NE(res, nullptr);
TAOS_ROW row = taos_fetch_row(res);
int64_t ts = *(int64_t*)row[0];
ASSERT_EQ(ts, 1626006833639000000);
taos_free_result(res);
}
\ No newline at end of file
此差异已折叠。
......@@ -4093,6 +4093,7 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl
}
if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1;
if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1;
if (tEncodeI64v(pEncoder, pBlock->sver) < 0) return -1;
tEndEncode(pEncoder);
return 0;
......@@ -4111,6 +4112,7 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
}
if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1;
if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1;
if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1;
tEndDecode(pDecoder);
return 0;
......
......@@ -21,9 +21,9 @@ static SMsgCb tsDefaultMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg) {
PutToQueueFp fp = pMsgCb->queueFps[qtype];
return (*fp)(pMsgCb->mgmt, pReq);
return (*fp)(pMsgCb->mgmt, pMsg);
}
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
......@@ -31,17 +31,17 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
return (*fp)(pMsgCb->mgmt, vgId, qtype);
}
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pReq) {
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
SendReqFp fp = tsDefaultMsgCb.sendReqFp;
return (*fp)(epSet, pReq);
return (*fp)(epSet, pMsg);
}
void tmsgSendRsp(const SRpcMsg* pMsg) {
void tmsgSendRsp(SRpcMsg* pMsg) {
SendRspFp fp = tsDefaultMsgCb.sendRspFp;
return (*fp)(pMsg);
}
void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
(*fp)(pMsg, pNewEpSet);
}
......
......@@ -374,39 +374,135 @@ char getPrecisionUnit(int32_t precision) {
}
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI ||
fromPrecision == TSDB_TIME_PRECISION_MICRO ||
fromPrecision == TSDB_TIME_PRECISION_NANO);
assert(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO ||
assert(toPrecision == TSDB_TIME_PRECISION_MILLI ||
toPrecision == TSDB_TIME_PRECISION_MICRO ||
toPrecision == TSDB_TIME_PRECISION_NANO);
static double factors[3][3] = {{1., 1000., 1000000.}, {1.0 / 1000, 1., 1000.}, {1.0 / 1000000, 1.0 / 1000, 1.}};
return (int64_t)((double)time * factors[fromPrecision][toPrecision]);
double tempResult = (double)time;
switch(fromPrecision) {
case TSDB_TIME_PRECISION_MILLI: {
switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI:
return time;
case TSDB_TIME_PRECISION_MICRO:
tempResult *= 1000;
time *= 1000;
goto end_;
case TSDB_TIME_PRECISION_NANO:
tempResult *= 1000000;
time *= 1000000;
goto end_;
}
} // end from milli
case TSDB_TIME_PRECISION_MICRO: {
switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI:
return time / 1000;
case TSDB_TIME_PRECISION_MICRO:
return time;
case TSDB_TIME_PRECISION_NANO:
tempResult *= 1000;
time *= 1000;
goto end_;
}
} //end from micro
case TSDB_TIME_PRECISION_NANO: {
switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI:
return time / 1000000;
case TSDB_TIME_PRECISION_MICRO:
return time / 1000;
case TSDB_TIME_PRECISION_NANO:
return time;
}
} //end from nano
default: {
assert(0);
return time; // only to pass windows compilation
}
} //end switch fromPrecision
end_:
if (tempResult >= (double)INT64_MAX) return INT64_MAX;
if (tempResult <= (double)INT64_MIN) return INT64_MIN; // INT64_MIN means NULL
return time;
}
// !!!!notice:there are precision problems, double lose precison if time is too large, for example: 1626006833631000000*1.0 = double = 1626006833631000064
//int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
// assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
// fromPrecision == TSDB_TIME_PRECISION_NANO);
// assert(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO ||
// toPrecision == TSDB_TIME_PRECISION_NANO);
// static double factors[3][3] = {{1., 1000., 1000000.}, {1.0 / 1000, 1., 1000.}, {1.0 / 1000000, 1.0 / 1000, 1.}};
// ((double)time * factors[fromPrecision][toPrecision]);
//}
// !!!!notice: double lose precison if time is too large, for example: 1626006833631000000*1.0 = double = 1626006833631000064
int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit) {
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
fromPrecision == TSDB_TIME_PRECISION_NANO);
static double factors[3] = {1000000., 1000., 1.};
int64_t factors[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
double tmp = time;
switch (toUnit) {
case 's':
return time * factors[fromPrecision] / NANOSECOND_PER_SEC;
case 's':{
tmp /= (NANOSECOND_PER_SEC/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_SEC/factors[fromPrecision]);
break;
}
case 'm':
return time * factors[fromPrecision] / NANOSECOND_PER_MINUTE;
tmp /= (NANOSECOND_PER_MINUTE/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_MINUTE/factors[fromPrecision]);
break;
case 'h':
return time * factors[fromPrecision] / NANOSECOND_PER_HOUR;
tmp /= (NANOSECOND_PER_HOUR/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_HOUR/factors[fromPrecision]);
break;
case 'd':
return time * factors[fromPrecision] / NANOSECOND_PER_DAY;
tmp /= (NANOSECOND_PER_DAY/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_DAY/factors[fromPrecision]);
break;
case 'w':
return time * factors[fromPrecision] / NANOSECOND_PER_WEEK;
tmp /= (NANOSECOND_PER_WEEK/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_WEEK/factors[fromPrecision]);
break;
case 'a':
return time * factors[fromPrecision] / NANOSECOND_PER_MSEC;
tmp /= (NANOSECOND_PER_MSEC/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_MSEC/factors[fromPrecision]);
break;
case 'u':
return time * factors[fromPrecision] / NANOSECOND_PER_USEC;
// the result of (NANOSECOND_PER_USEC/(double)factors[fromPrecision]) maybe a double
switch (fromPrecision) {
case TSDB_TIME_PRECISION_MILLI:{
tmp *= 1000;
time *= 1000;
break;
}
case TSDB_TIME_PRECISION_MICRO:{
tmp /= 1;
time /= 1;
break;
}
case TSDB_TIME_PRECISION_NANO:{
tmp /= 1000;
time /= 1000;
break;
}
}
break;
case 'b':
return time * factors[fromPrecision];
tmp *= factors[fromPrecision];
time *= factors[fromPrecision];
break;
default: {
return -1;
}
}
if (tmp >= (double)INT64_MAX) return INT64_MAX;
if (tmp <= (double)INT64_MIN) return INT64_MIN;
return time;
}
int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec, int64_t *timeVal) {
......
......@@ -17,6 +17,15 @@ TARGET_INCLUDE_DIRECTORIES(
PRIVATE "${TD_SOURCE_DIR}/source/libs/common/inc"
)
# dataformatTest.cpp
add_executable(dataformatTest "")
target_sources(
dataformatTest
PRIVATE
"dataformatTest.cpp"
)
target_link_libraries(dataformatTest gtest gtest_main util)
# tmsg test
# add_executable(tmsgTest "")
# target_sources(tmsgTest
......
#include "gtest/gtest.h"
\ No newline at end of file
#include <gtest/gtest.h>
#include "trow.h"
TEST(td_row_test, build_row_to_target) {
#if 0
char dst[1024];
SRow* pRow = (SRow*)dst;
int ncols = 10;
col_id_t cid;
void* pData;
SRowBuilder rb = trbInit(TD_OR_ROW_BUILDER, NULL, 0, pRow, 1024);
trbSetRowInfo(&rb, false, 0);
trbSetRowTS(&rb, 1637550210000);
for (int c = 0; c < ncols; c++) {
cid = c;
if (trbWriteCol(&rb, pData, cid) < 0) {
// TODO
}
}
#endif
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
......@@ -75,6 +75,7 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->statusThread)) {
taosThreadJoin(pMgmt->statusThread, NULL);
taosThreadClear(&pMgmt->statusThread);
}
}
......@@ -95,6 +96,7 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->monitorThread)) {
taosThreadJoin(pMgmt->monitorThread, NULL);
taosThreadClear(&pMgmt->monitorThread);
}
}
......
......@@ -196,6 +196,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
SVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
taosThreadJoin(pThread->thread, NULL);
taosThreadClear(&pThread->thread);
}
taosMemoryFree(pThread->pCfgs);
}
......
......@@ -151,12 +151,10 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper);
void dmCleanupProc(struct SMgmtWrapper *pWrapper);
int32_t dmRunProc(SProc *proc);
void dmStopProc(SProc *proc);
int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle);
void dmRemoveProcRpcHandle(SProc *proc, void *handle);
void dmCloseProcRpcHandles(SProc *proc);
int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, int64_t handleRef, EProcFuncType ftype);
void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype);
int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype);
void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype);
// dmTransport.c
int32_t dmInitServer(SDnode *pDnode);
......
......@@ -58,6 +58,7 @@ static int32_t dmNewProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) {
return -1;
}
taosIgnSignal(SIGCHLD);
pWrapper->proc.pid = pid;
dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
return 0;
......@@ -254,7 +255,7 @@ static void dmWatchNodes(SDnode *pDnode) {
if (!OnlyInParentProc(pWrapper)) continue;
if (proc->pid <= 0 || !taosProcExist(proc->pid)) {
dWarn("node:%s, process:%d is killed and needs to restart", pWrapper->name, proc->pid);
dError("node:%s, process:%d is killed and needs to restart", pWrapper->name, proc->pid);
dmCloseProcRpcHandles(&pWrapper->proc);
dmNewProc(pWrapper, ntype);
}
......
......@@ -16,10 +16,7 @@
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
static inline int32_t CEIL8(int32_t v) {
const int32_t c = ceil((float)(v) / 8) * 8;
return c < 8 ? 8 : c;
}
static inline int32_t CEIL8(int32_t v) { return ceil((float)(v) / 8) * 8; }
static int32_t dmInitProcMutex(SProcQueue *queue) {
TdThreadMutexAttr mattr = {0};
......@@ -87,42 +84,17 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
return queue;
}
#if 0
static void dmDestroyProcQueue(SProcQueue *queue) {
if (queue->mutex != NULL) {
taosThreadMutexDestroy(queue->mutex);
queue->mutex = NULL;
}
}
static void dmDestroyProcSem(SProcQueue *queue) {
if (queue->sem != NULL) {
tsem_destroy(queue->sem);
queue->sem = NULL;
}
}
#endif
static void dmCleanupProcQueue(SProcQueue *queue) {}
static void dmCleanupProcQueue(SProcQueue *queue) {
#if 0
if (queue != NULL) {
dmDestroyProcQueue(queue);
dmDestroyProcSem(queue);
}
#endif
}
static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHead, int16_t rawHeadLen,
const char *pBody, int32_t rawBodyLen, int64_t handle, int64_t handleRef,
EProcFuncType ftype) {
if (rawHeadLen == 0 || pHead == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
const int32_t headLen = CEIL8(rawHeadLen);
static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
const void *pHead = pMsg;
const void *pBody = pMsg->pCont;
const int16_t rawHeadLen = sizeof(SRpcMsg);
const int32_t rawBodyLen = pMsg->contLen;
const int16_t headLen = CEIL8(rawHeadLen);
const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8;
const int64_t handle = (int64_t)pMsg->info.handle;
taosThreadMutexLock(&queue->mutex);
if (fullLen > queue->avail) {
......@@ -131,8 +103,8 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
return -1;
}
if (handle != 0 && ftype == DND_FUNC_REQ) {
if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) {
if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0) {
if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &pMsg->info, sizeof(SRpcConnInfo)) != 0) {
taosThreadMutexUnlock(&queue->mutex);
return -1;
}
......@@ -151,31 +123,31 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
if (queue->tail < queue->head) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen);
queue->tail = queue->tail + 8 + headLen + bodyLen;
} else {
int32_t remain = queue->total - queue->tail;
if (remain == 0) {
memcpy(queue->pBuffer + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen);
queue->tail = 8 + headLen + bodyLen;
} else if (remain == 8) {
memcpy(queue->pBuffer, pHead, rawHeadLen);
memcpy(queue->pBuffer + headLen, pBody, rawBodyLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen, pBody, rawBodyLen);
queue->tail = headLen + bodyLen;
} else if (remain < 8 + headLen) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8);
memcpy(queue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8));
memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
memcpy(queue->pBuffer, (char*)pHead + remain - 8, rawHeadLen - (remain - 8));
if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
queue->tail = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + headLen + bodyLen) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
memcpy(queue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer, (char*)pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
queue->tail = bodyLen - (remain - 8 - headLen);
} else {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen);
queue->tail = queue->tail + headLen + bodyLen + 8;
}
}
......@@ -185,13 +157,12 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
taosThreadMutexUnlock(&queue->mutex);
tsem_post(&queue->sem);
dTrace("node:%s, push %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen,
pBody, bodyLen, pos, queue->items);
dTrace("node:%s, push %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype),
pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, pMsg->code, pos, queue->items);
return 0;
}
static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHeadLen, void **ppBody, int32_t *pBodyLen,
EProcFuncType *pFuncType) {
static int32_t dmPopFromProcQueue(SProcQueue *queue, SRpcMsg **ppMsg, EProcFuncType *pFuncType) {
tsem_wait(&queue->sem);
taosThreadMutexLock(&queue->mutex);
......@@ -217,8 +188,9 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
int32_t bodyLen = CEIL8(rawBodyLen);
void *pHead = taosAllocateQitem(headLen, DEF_QITEM);
void *pBody = rpcMallocCont(bodyLen);
if (pHead == NULL || pBody == NULL) {
void *pBody = NULL;
if (bodyLen > 0) pBody = rpcMallocCont(bodyLen);
if (pHead == NULL || (bodyLen > 0 && pBody == NULL)) {
taosThreadMutexUnlock(&queue->mutex);
tsem_post(&queue->sem);
taosFreeQitem(pHead);
......@@ -230,31 +202,31 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
const int32_t pos = queue->head;
if (queue->head < queue->tail) {
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen);
queue->head = queue->head + 8 + headLen + bodyLen;
} else {
int32_t remain = queue->total - queue->head;
if (remain == 0) {
memcpy(pHead, queue->pBuffer + 8, headLen);
memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen);
queue->head = 8 + headLen + bodyLen;
} else if (remain == 8) {
memcpy(pHead, queue->pBuffer, headLen);
memcpy(pBody, queue->pBuffer + headLen, bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen, bodyLen);
queue->head = headLen + bodyLen;
} else if (remain < 8 + headLen) {
memcpy(pHead, queue->pBuffer + queue->head + 8, remain - 8);
memcpy((char *)pHead + remain - 8, queue->pBuffer, headLen - (remain - 8));
memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen);
queue->head = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + headLen + bodyLen) {
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen);
memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen));
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen);
if (bodyLen > 0) memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen));
queue->head = bodyLen - (remain - 8 - headLen);
} else {
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen);
queue->head = queue->head + headLen + bodyLen + 8;
}
}
......@@ -263,14 +235,12 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
queue->items--;
taosThreadMutexUnlock(&queue->mutex);
*ppHead = pHead;
*ppBody = pBody;
*pHeadLen = rawHeadLen;
*pBodyLen = rawBodyLen;
*ppMsg = pHead;
(*ppMsg)->pCont = pBody;
*pFuncType = (EProcFuncType)ftype;
dTrace("node:%s, pop %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen, pBody,
bodyLen, pos, queue->items);
dTrace("node:%s, pop %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype),
(*ppMsg), (*ppMsg)->msgType, (*ppMsg)->info.handle, (*ppMsg)->contLen, (*ppMsg)->code, pos, queue->items);
return 1;
}
......@@ -308,18 +278,14 @@ static void *dmConsumChildQueue(void *param) {
SProc *proc = param;
SMgmtWrapper *pWrapper = proc->wrapper;
SProcQueue *queue = proc->cqueue;
void *pHead = NULL;
void *pBody = NULL;
int16_t headLen = 0;
int32_t bodyLen = 0;
int32_t numOfMsgs = 0;
int32_t code = 0;
EProcFuncType ftype = DND_FUNC_REQ;
SRpcMsg *pReq = NULL;
SRpcMsg *pMsg = NULL;
dDebug("node:%s, start to consume from cqueue", proc->name);
do {
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype);
if (numOfMsgs == 0) {
dDebug("node:%s, get no msg from cqueue and exit thread", proc->name);
break;
......@@ -332,25 +298,24 @@ static void *dmConsumChildQueue(void *param) {
}
if (ftype != DND_FUNC_REQ) {
dFatal("node:%s, get msg:%p from cqueue, invalid ftype:%d", proc->name, pHead, ftype);
taosFreeQitem(pHead);
rpcFreeCont(pBody);
} else {
pReq = pHead;
pReq->pCont = pBody;
code = dmProcessNodeMsg(pWrapper, pReq);
if (code != 0) {
dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pReq, terrstr());
SRpcMsg rspMsg = {
.info = pReq->info,
.pCont = pReq->info.rsp,
.contLen = pReq->info.rspLen,
};
dmPutToProcPQueue(proc, &rspMsg, sizeof(SRpcMsg), rspMsg.pCont, rspMsg.contLen, DND_FUNC_RSP);
taosFreeQitem(pHead);
rpcFreeCont(pBody);
rpcFreeCont(rspMsg.pCont);
}
dError("node:%s, invalid ftype:%d from cqueue", proc->name, ftype);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
continue;
}
code = dmProcessNodeMsg(pWrapper, pMsg);
if (code != 0) {
dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr());
SRpcMsg rsp = {
.code = (terrno != 0 ? terrno : code),
.pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen,
.info = pMsg->info,
};
dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
} while (1);
......@@ -361,18 +326,14 @@ static void *dmConsumParentQueue(void *param) {
SProc *proc = param;
SMgmtWrapper *pWrapper = proc->wrapper;
SProcQueue *queue = proc->pqueue;
void *pHead = NULL;
void *pBody = NULL;
int16_t headLen = 0;
int32_t bodyLen = 0;
int32_t numOfMsgs = 0;
int32_t code = 0;
EProcFuncType ftype = DND_FUNC_REQ;
SRpcMsg *pRsp = NULL;
SRpcMsg *pMsg = NULL;
dDebug("node:%s, start to consume from pqueue", proc->name);
do {
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype);
if (numOfMsgs == 0) {
dDebug("node:%s, get no msg from pqueue and exit thread", proc->name);
break;
......@@ -385,31 +346,19 @@ static void *dmConsumParentQueue(void *param) {
}
if (ftype == DND_FUNC_RSP) {
pRsp = pHead;
pRsp->pCont = pBody;
dTrace("node:%s, get rsp msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->info.handle);
dmRemoveProcRpcHandle(proc, pRsp->info.handle);
rpcSendResponse(pRsp);
dmRemoveProcRpcHandle(proc, pMsg->info.handle);
rpcSendResponse(pMsg);
} else if (ftype == DND_FUNC_REGIST) {
pRsp = pHead;
pRsp->pCont = pBody;
dTrace("node:%s, get regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code,
pRsp->info.handle);
rpcRegisterBrokenLinkArg(pRsp);
rpcRegisterBrokenLinkArg(pMsg);
} else if (ftype == DND_FUNC_RELEASE) {
pRsp = pHead;
pRsp->pCont = NULL;
dTrace("node:%s, get release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code,
pRsp->info.handle);
dmRemoveProcRpcHandle(proc, pRsp->info.handle);
rpcReleaseHandle(pRsp->info.handle, (int8_t)pRsp->code);
rpcFreeCont(pBody);
dmRemoveProcRpcHandle(proc, pMsg->info.handle);
rpcReleaseHandle(pMsg->info.handle, (int8_t)pMsg->code);
} else {
dFatal("node:%s, get msg:%p from pqueue, invalid ftype:%d", proc->name, pHead, ftype);
rpcFreeCont(pBody);
dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype);
rpcFreeCont(pMsg->pCont);
}
taosFreeQitem(pHead);
taosFreeQitem(pMsg);
} while (1);
return NULL;
......@@ -468,51 +417,55 @@ void dmCleanupProc(struct SMgmtWrapper *pWrapper) {
dmCleanupProcQueue(proc->cqueue);
dmCleanupProcQueue(proc->pqueue);
taosHashCleanup(proc->hash);
proc->hash = NULL;
dDebug("node:%s, proc is cleaned up", pWrapper->name);
}
int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle) {
void dmRemoveProcRpcHandle(SProc *proc, void *handle) {
int64_t h = (int64_t)handle;
taosThreadMutexLock(&proc->cqueue->mutex);
int64_t *pRef = taosHashGet(proc->hash, &h, sizeof(int64_t));
int64_t ref = 0;
if (pRef != NULL) {
ref = *pRef;
}
taosHashRemove(proc->hash, &h, sizeof(int64_t));
taosThreadMutexUnlock(&proc->cqueue->mutex);
return ref;
}
void dmCloseProcRpcHandles(SProc *proc) {
taosThreadMutexLock(&proc->cqueue->mutex);
void *h = taosHashIterate(proc->hash, NULL);
while (h != NULL) {
void *handle = *((void **)h);
h = taosHashIterate(proc->hash, h);
dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, handle);
SRpcMsg rpcMsg = {.info.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
SRpcHandleInfo *pInfo = taosHashIterate(proc->hash, NULL);
while (pInfo != NULL) {
dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, pInfo->handle);
SRpcMsg rpcMsg = {.info = *pInfo, .code = TSDB_CODE_NODE_OFFLINE};
rpcSendResponse(&rpcMsg);
pInfo = taosHashIterate(proc->hash, pInfo);
}
taosHashClear(proc->hash);
taosThreadMutexUnlock(&proc->cqueue->mutex);
}
void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype) {
void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
int32_t retry = 0;
while (dmPushToProcQueue(proc, proc->pqueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) {
dWarn("node:%s, failed to put msg:%p to pqueue since %s, retry:%d", proc->name, pHead, terrstr(), retry);
retry++;
taosMsleep(retry);
while (1) {
if (dmPushToProcQueue(proc, proc->pqueue, pMsg, ftype) == 0) {
break;
}
if (retry == 10) {
pMsg->code = terrno;
if (pMsg->contLen > 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
pMsg->contLen = 0;
}
dError("node:%s, failed to push %s msg:%p type:%d handle:%p then discard data and return error", proc->name,
dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle);
} else {
dError("node:%s, failed to push %s msg:%p type:%d handle:%p len:%d since %s, retry:%d", proc->name,
dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, terrstr(), retry);
retry++;
taosMsleep(retry);
}
}
}
int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, int64_t ref, EProcFuncType ftype) {
return dmPushToProcQueue(proc, proc->cqueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ref, ftype);
int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
return dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype);
}
......@@ -128,9 +128,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
}
if (InParentProc(pWrapper)) {
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen,
(IsReq(pRpc) && (pRpc->code == 0)) ? pRpc->info.handle : NULL, pRpc->info.refId,
DND_FUNC_REQ);
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
} else {
code = dmProcessNodeMsg(pWrapper, pMsg);
}
......@@ -255,23 +253,23 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) {
}
}
static inline void dmSendRsp(const SRpcMsg *pMsg) {
static inline void dmSendRsp(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_RSP);
if (pMsg->code == TSDB_CODE_NODE_REDIRECT) {
dmSendRpcRedirectRsp(pMsg);
} else {
if (pMsg->code == TSDB_CODE_NODE_REDIRECT) {
dmSendRpcRedirectRsp(pMsg);
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
} else {
rpcSendResponse(pMsg);
}
}
}
static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
SMgmtWrapper *pWrapper = pRsp->info.wrapper;
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
} else {
SRpcMsg rsp = {0};
SMEpSet msg = {.epSet = *pNewEpSet};
......@@ -281,7 +279,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe
tSerializeSMEpSet(rsp.pCont, len, &msg);
rsp.code = TSDB_CODE_RPC_REDIRECT;
rsp.info = pRsp->info;
rsp.info = pMsg->info;
rpcSendResponse(&rsp);
}
}
......@@ -289,7 +287,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST);
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
} else {
rpcRegisterBrokenLinkArg(pMsg);
}
......@@ -299,7 +297,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
SMgmtWrapper *pWrapper = pHandle->wrapper;
if (InChildProc(pWrapper)) {
SRpcMsg msg = {.code = type, .info = *pHandle};
dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE);
dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
} else {
rpcReleaseHandle(pHandle->handle, type);
}
......
......@@ -122,7 +122,7 @@ static void mndCleanupTimer(SMnode *pMnode) {
pMnode->stopped = true;
if (taosCheckPthreadValid(pMnode->thread)) {
taosThreadJoin(pMnode->thread, NULL);
memset(&pMnode->thread, 0, sizeof(pMnode->thread));
taosThreadClear(&pMnode->thread);
}
}
......
......@@ -16,10 +16,9 @@
#include "tcache.h"
void reportStartup(const char *name, const char *desc) {}
void sendRsp(const SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); }
void sendRsp(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); }
int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
// rpcFreeCont(pMsg->pCont);
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
......
......@@ -213,8 +213,9 @@ int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valL
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
return -1;
}
memcpy(pVal, pRaw->pData + dataPos, valLen);
if (pVal != NULL) {
memcpy(pVal, pRaw->pData + dataPos, valLen);
}
return 0;
}
......@@ -235,4 +236,4 @@ int32_t sdbGetRawTotalSize(SSdbRaw *pRaw) {
}
return sizeof(SSdbRaw) + pRaw->dataLen;
}
\ No newline at end of file
}
......@@ -57,56 +57,56 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
}
// open pTbDb
ret = tdbDbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb);
ret = tdbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb);
if (ret < 0) {
metaError("vgId:%d failed to open meta table db since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pSkmDb
ret = tdbDbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb);
ret = tdbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb);
if (ret < 0) {
metaError("vgId:%d failed to open meta schema db since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pUidIdx
ret = tdbDbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
ret = tdbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pNameIdx
ret = tdbDbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx);
ret = tdbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta name index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pCtbIdx
ret = tdbDbOpen("ctb.idx", sizeof(SCtbIdxKey), 0, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx);
ret = tdbOpen("ctb.idx", sizeof(SCtbIdxKey), 0, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta child table index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pTagIdx
ret = tdbDbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx);
ret = tdbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pTtlIdx
ret = tdbDbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx);
ret = tdbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pSmaIdx
ret = tdbDbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx);
ret = tdbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta sma index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
......@@ -125,14 +125,14 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
_err:
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pSmaIdx) tdbDbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbDbClose(pMeta->pTtlIdx);
if (pMeta->pTagIdx) tdbDbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbDbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbDbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbDbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbDbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbDbClose(pMeta->pTbDb);
if (pMeta->pSmaIdx) tdbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbClose(pMeta->pTtlIdx);
if (pMeta->pTagIdx) tdbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbClose(pMeta->pTbDb);
if (pMeta->pEnv) tdbEnvClose(pMeta->pEnv);
metaDestroyLock(pMeta);
taosMemoryFree(pMeta);
......@@ -142,14 +142,14 @@ _err:
int metaClose(SMeta *pMeta) {
if (pMeta) {
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pSmaIdx) tdbDbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbDbClose(pMeta->pTtlIdx);
if (pMeta->pTagIdx) tdbDbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbDbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbDbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbDbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbDbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbDbClose(pMeta->pTbDb);
if (pMeta->pSmaIdx) tdbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbClose(pMeta->pTtlIdx);
if (pMeta->pTagIdx) tdbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbClose(pMeta->pTbDb);
if (pMeta->pEnv) tdbEnvClose(pMeta->pEnv);
metaDestroyLock(pMeta);
taosMemoryFree(pMeta);
......
......@@ -35,7 +35,7 @@ int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t u
STbDbKey tbDbKey = {.version = version, .uid = uid};
// query table.db
if (tdbDbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
if (tdbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
goto _err;
}
......@@ -58,7 +58,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
int64_t version;
// query uid.idx
if (tdbDbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
if (tdbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
......@@ -72,7 +72,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
tb_uid_t uid;
// query name.idx
if (tdbDbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
if (tdbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
......@@ -159,7 +159,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
pKey = &skmDbKey;
kLen = sizeof(skmDbKey);
metaRLock(pMeta);
ret = tdbDbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen);
ret = tdbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen);
metaULock(pMeta);
if (ret < 0) {
return NULL;
......@@ -413,7 +413,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy((void*)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
memcpy((void *)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
}
if (pTSma->tagsFilterLen > 0) {
if (!(pTSma->tagsFilter = taosMemoryCalloc(1, pTSma->tagsFilterLen))) {
......@@ -421,14 +421,14 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
goto _err;
}
}
memcpy((void*)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
memcpy((void *)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
} else {
pTSma->exprLen = 0;
pTSma->expr = NULL;
pTSma->tagsFilterLen = 0;
pTSma->tagsFilter = NULL;
}
++smaIdx;
}
......
......@@ -117,7 +117,7 @@ static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) {
tEncoderClear(&coder);
// write to table.db
if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
if (tdbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
goto _err;
}
......@@ -130,17 +130,17 @@ _err:
}
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
return tdbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
}
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
return tdbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
}
static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) {
SSmaIdxKey smaIdxKey = {.uid = pME->smaEntry.tsma->tableUid, .smaUid = pME->smaEntry.tsma->indexUid};
return tdbDbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, &pMeta->txn);
return tdbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, &pMeta->txn);
}
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) {
......
......@@ -389,7 +389,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
int c;
// search name index
ret = tdbDbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
ret = tdbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
return -1;
......@@ -536,7 +536,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
int nData = 0;
// search name index
ret = tdbDbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
ret = tdbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
return -1;
......@@ -573,9 +573,9 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
tDecoderClear(&dc);
/* get stbEntry*/
tdbDbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
tdbDbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey),
(void **)&stbEntry.pBuf, &nVal);
tdbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
tdbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey),
(void **)&stbEntry.pBuf, &nVal);
tdbFree(pVal);
tDecoderInit(&dc, stbEntry.pBuf, nVal);
metaDecodeEntry(&dc, &stbEntry);
......@@ -632,7 +632,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaSaveToTbDb(pMeta, &ctbEntry);
// save to uid.idx
tdbDbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn);
tdbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn);
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
......@@ -708,7 +708,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
tEncoderClear(&coder);
// write to table.db
if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
if (tdbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
goto _err;
}
......@@ -721,11 +721,11 @@ _err:
}
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
return tdbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
}
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
return tdbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
}
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
......@@ -748,12 +748,12 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
ttlKey.dtime = ctime + ttlDays * 24 * 60 * 60;
ttlKey.uid = pME->uid;
return tdbDbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn);
return tdbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn);
}
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid};
return tdbDbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
return tdbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
}
static int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int8_t type, tb_uid_t uid,
......@@ -801,10 +801,10 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
SDecoder dc = {0};
// get super table
tdbDbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData);
tdbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData);
tbDbKey.uid = pCtbEntry->ctbEntry.suid;
tbDbKey.version = *(int64_t *)pData;
tdbDbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
tdbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
tDecoderInit(&dc, pData, nData);
metaDecodeEntry(&dc, &stbEntry);
......@@ -817,7 +817,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
&pTagIdxKey, &nTagIdxKey) < 0) {
return -1;
}
tdbDbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
tdbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
metaDestroyTagIdxKey(pTagIdxKey);
tDecoderClear(&dc);
......@@ -859,7 +859,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
tEncoderInit(&coder, pVal, vLen);
tEncodeSSchemaWrapper(&coder, pSW);
if (tdbDbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) {
if (tdbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) {
rcode = -1;
goto _exit;
}
......
......@@ -59,14 +59,14 @@ static int32_t smaOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) {
// Create a database
compFunc = tdSmaKeyCmpr;
if(tdbDbOpen(pFName, -1, -1, compFunc, pEnv, ppDB) < 0) {
if (tdbOpen(pFName, -1, -1, compFunc, pEnv, ppDB) < 0) {
return -1;
}
return 0;
}
static int32_t smaCloseDBDb(TDB *pDB) { return tdbDbClose(pDB); }
static int32_t smaCloseDBDb(TDB *pDB) { return tdbClose(pDB); }
int32_t smaOpenDBF(TENV *pEnv, SDBFile *pDBF) {
// TEnv is shared by a group of SDBFile
......@@ -99,7 +99,7 @@ int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, in
int32_t ret;
printf("save tsma data into %s, keyLen:%d valLen:%d txn:%p\n", pDBF->path, keyLen, valLen, txn);
ret = tdbDbUpsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
ret = tdbUpsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
if (ret < 0) {
smaError("failed to upsert tsma data into db, ret = %d", ret);
return -1;
......@@ -112,7 +112,7 @@ void *smaGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_
void *pVal = NULL;
int ret;
ret = tdbDbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
ret = tdbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
if (ret < 0) {
smaError("failed to get tsma data from db, ret = %d", ret);
......
......@@ -309,6 +309,7 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
TSKEY keyMin;
TSKEY keyMax;
SSubmitBlk *pBlkCopy;
int64_t sverNew;
// check if table exists
SMetaReader mr = {0};
......@@ -319,6 +320,12 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
if (mr.me.type == TSDB_NORMAL_TABLE) {
sverNew = mr.me.ntbEntry.schema.sver;
} else {
metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid);
sverNew = mr.me.stbEntry.schema.sver;
}
metaReaderClear(&mr);
// create container is nedd
......@@ -367,6 +374,7 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
pRsp->numOfRows = pMsgIter->numOfRows;
pRsp->affectedRows = pMsgIter->numOfRows;
pRsp->sver = sverNew;
return 0;
}
......
......@@ -60,12 +60,12 @@ static int32_t tsdbOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) {
// Create a database
compFunc = tsdbSmaKeyCmpr;
ret = tdbDbOpen(pFName, -1, -1, compFunc, pEnv, ppDB);
ret = tdbOpen(pFName, -1, -1, compFunc, pEnv, ppDB);
return 0;
}
static int32_t tsdbCloseDBDb(TDB *pDB) { return tdbDbClose(pDB); }
static int32_t tsdbCloseDBDb(TDB *pDB) { return tdbClose(pDB); }
int32_t tsdbOpenDBF(TENV *pEnv, SDBFile *pDBF) {
// TEnv is shared by a group of SDBFile
......@@ -97,7 +97,7 @@ int32_t tsdbCloseDBF(SDBFile *pDBF) {
int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) {
int32_t ret;
ret = tdbDbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
ret = tdbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
if (ret < 0) {
tsdbError("Failed to create insert sma data into db, ret = %d", ret);
return -1;
......@@ -110,7 +110,7 @@ void *tsdbGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32
void *pVal = NULL;
int ret;
ret = tdbDbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
ret = tdbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
if (ret < 0) {
tsdbError("Failed to get sma data from db, ret = %d", ret);
......
......@@ -131,6 +131,7 @@ typedef struct SCtgCacheStat {
uint64_t dbNum;
uint64_t tblNum;
uint64_t stblNum;
uint64_t userNum;
uint64_t vgHitNum;
uint64_t vgMissNum;
uint64_t tblHitNum;
......
......@@ -122,6 +122,11 @@ void ctgFreeDbCache(SCtgDBCache *dbCache) {
ctgFreeTableMetaCache(&dbCache->tbCache);
}
void ctgFreeSCtgUserAuth(SCtgUserAuth *userCache) {
taosHashCleanup(userCache->createdDbs);
taosHashCleanup(userCache->readDbs);
taosHashCleanup(userCache->writeDbs);
}
void ctgFreeHandle(SCatalog* pCtg) {
ctgFreeMetaRent(&pCtg->dbRent);
......@@ -145,7 +150,24 @@ void ctgFreeHandle(SCatalog* pCtg) {
CTG_CACHE_STAT_SUB(dbNum, dbNum);
}
if (pCtg->userCache) {
int32_t userNum = taosHashGetSize(pCtg->userCache);
void *pIter = taosHashIterate(pCtg->userCache, NULL);
while (pIter) {
SCtgUserAuth *userCache = pIter;
ctgFreeSCtgUserAuth(userCache);
pIter = taosHashIterate(pCtg->userCache, pIter);
}
taosHashCleanup(pCtg->userCache);
CTG_CACHE_STAT_SUB(userNum, userNum);
}
taosMemoryFree(pCtg);
}
......@@ -2031,10 +2053,13 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
SName stbName = *pTableName;
strcpy(stbName.tname, output->tbName);
taosMemoryFreeClear(output->tbMeta);
CTG_ERR_JRET(ctgGetTableMetaFromCache(pCtg, &stbName, pTableMeta, &inCache, flag, NULL));
if (!inCache) {
ctgDebug("stb no longer exist, dbFName:%s, tbName:%s", output->dbFName, pTableName->tname);
continue;
}
......@@ -2306,6 +2331,8 @@ int32_t ctgActUpdateUser(SCtgMetaAction *action) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
taosMemoryFreeClear(msg);
return TSDB_CODE_SUCCESS;
}
......
......@@ -14,6 +14,7 @@
*/
#include "builtins.h"
#include "querynodes.h"
#include "builtinsimpl.h"
#include "scalar.h"
#include "taoserror.h"
......@@ -201,15 +202,32 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_
}
static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t paraNum = LIST_LENGTH(pFunc->pParameterList);
if (2 != paraNum) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
SNode* pParamNode = nodesListGetNode(pFunc->pParameterList, 1);
if (nodeType(pParamNode) != QUERY_NODE_VALUE) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
SValueNode* pValue = (SValueNode*) pParamNode;
if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (pValue->datum.i < 1 || pValue->datum.i > 100) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
}
SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type};
return TSDB_CODE_SUCCESS;
}
static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type};
return TSDB_CODE_SUCCESS;
return translateTop(pFunc, pErrBuf, len);
}
static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
......
......@@ -2497,7 +2497,7 @@ bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo
pInfo->min = MAX_TS_KEY;
pInfo->max = 0;
if (pCtx->numOfParams == 3) {
if (pCtx->numOfParams == 2) {
pInfo->timeUnit = pCtx->param[1].param.i;
} else {
pInfo->timeUnit = 1;
......@@ -2521,7 +2521,6 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
}
if (pInput->colDataAggIsSet) {
if (pInfo->min == MAX_TS_KEY) {
pInfo->min = GET_INT64_VAL(&pAgg->min);
pInfo->max = GET_INT64_VAL(&pAgg->max);
......
......@@ -1528,7 +1528,15 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
if (code != 0) {
return code;
}
SUdfcUvSession *session = handle;
code = doCallUdfScalarFunc(handle, input, numOfCols, output);
if (session->outputType != output->columnData->info.type
|| session->outputLen != output->columnData->info.bytes) {
fnError("udfc scalar function calculate error, session type: %d(%d), output type: %d(%d)",
session->outputType, session->outputLen,
output->columnData->info.type, output->columnData->info.bytes);
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
releaseUdfFuncHandle(udfName);
return code;
}
......
......@@ -139,7 +139,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SUdf* udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->funcType;
udf->outputType = pFuncInfo->outputType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
......
......@@ -361,7 +361,7 @@ int32_t indexConvertDataToStr(void* src, int8_t type, void** dst) {
tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src));
*dst = *dst - tlen;
*dst = (char*) * dst - tlen;
break;
}
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
......
......@@ -473,17 +473,16 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
int32_t sz = 0;
char* ch = (char*)fstSliceData(s, &sz);
// char* tmp = taosMemoryCalloc(1, sz + 1);
// memcpy(tmp, ch, sz);
char* tmp = taosMemoryCalloc(1, sz + 1);
memcpy(tmp, ch, sz);
if (0 != strncmp(ch, p, skip)) {
if (0 != strncmp(tmp, p, skip)) {
swsResultDestroy(rt);
// taosMemoryFree(tmp);
taosMemoryFree(tmp);
break;
}
TExeCond cond = cmpFn(ch + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType));
TExeCond cond = cmpFn(tmp + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType));
if (MATCH == cond) {
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
} else if (CONTINUE == cond) {
......@@ -491,7 +490,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
swsResultDestroy(rt);
break;
}
// taosMemoryFree(tmp);
taosMemoryFree(tmp);
swsResultDestroy(rt);
}
streamWithStateDestroy(st);
......
......@@ -578,3 +578,40 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_FLOAT) {
EXPECT_EQ(1000, taosArrayGetSize(res));
}
}
TEST_F(JsonEnv, testWriteJsonTfileAndCache_DOUBLE) {
{
double val = 10.0;
std::string colName("test1");
for (int i = 0; i < 1000; i++) {
WriteData(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), i);
}
}
{
double val = 2.0;
std::string colName("test1");
for (int i = 0; i < 1000; i++) {
WriteData(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), i + 1000);
}
}
{
SArray* res = NULL;
std::string colName("test1");
double val = 1.9;
Search(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(2000, taosArrayGetSize(res));
}
{
SArray* res = NULL;
std::string colName("test1");
double val = 2.1;
Search(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(1000, taosArrayGetSize(res));
}
{
std::string colName("test1");
SArray* res = NULL;
double val = 2.1;
Search(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(1000, taosArrayGetSize(res));
}
}
......@@ -1045,6 +1045,7 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
destroyInsertParseContextForTable(pCxt);
taosHashCleanup(pCxt->pVgroupsHashObj);
taosHashCleanup(pCxt->pSubTableHashObj);
taosHashCleanup(pCxt->pTableNameHashObj);
destroyBlockHashmap(pCxt->pTableBlockHashObj);
destroyBlockArrayList(pCxt->pVgDataBlocks);
......
......@@ -235,12 +235,12 @@ static void destroyDataBlock(STableDataBlocks* pDataBlock) {
}
taosMemoryFreeClear(pDataBlock->pData);
if (!pDataBlock->cloned) {
// if (!pDataBlock->cloned) {
// free the refcount for metermeta
taosMemoryFreeClear(pDataBlock->pTableMeta);
destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
}
// }
taosMemoryFreeClear(pDataBlock);
}
......
......@@ -1438,24 +1438,24 @@ int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
}
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
SSchedulerHbRsp rsp = {0};
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
if (code) {
qError("hb rsp error:%s", tstrerror(code));
SCH_ERR_RET(code);
SCH_ERR_JRET(code);
}
SSchedulerHbRsp rsp = {0};
if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
qError("invalid hb rsp msg, size:%d", pMsg->len);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
SSchTrans trans = {0};
trans.transInst = pParam->transport;
trans.transHandle = pMsg->handle;
SCH_ERR_RET(schUpdateHbConnection(&rsp.epId, &trans));
SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus);
qDebug("%d task status in hb rsp, nodeId:%d, fqdn:%s, port:%d", taskNum, rsp.epId.nodeId, rsp.epId.ep.fqdn,
......@@ -1483,6 +1483,7 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
_return:
tFreeSSchedulerHbRsp(&rsp);
taosMemoryFree(param);
SCH_RET(code);
}
......
......@@ -237,6 +237,7 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
int32_t ret = 0;
atomic_store_8(&io->isStart, 0);
taosThreadJoin(io->consumerTid, NULL);
taosThreadClear(&io->consumerTid);
taosTmrCleanUp(io->timerMgr);
return ret;
}
......
......@@ -37,14 +37,14 @@ int tdbBegin(TENV *pEnv, TXN *pTxn);
int tdbCommit(TENV *pEnv, TXN *pTxn);
// TDB
int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb);
int tdbDbClose(TDB *pDb);
int tdbDbDrop(TDB *pDb);
int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn);
int tdbDbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn);
int tdbDbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
int tdbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb);
int tdbClose(TDB *pDb);
int tdbDrop(TDB *pDb);
int tdbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn);
int tdbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn);
int tdbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int tdbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
// TDBC
int tdbDbcOpen(TDB *pDb, TDBC **ppDbc, TXN *pTxn);
......
......@@ -24,7 +24,7 @@ struct STDBC {
SBTC btc;
};
int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb) {
int tdbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb) {
TDB *pDb;
SPager *pPager;
int ret;
......@@ -65,7 +65,7 @@ int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn
return 0;
}
int tdbDbClose(TDB *pDb) {
int tdbClose(TDB *pDb) {
if (pDb) {
tdbBtreeClose(pDb->pBt);
tdbOsFree(pDb);
......@@ -73,26 +73,26 @@ int tdbDbClose(TDB *pDb) {
return 0;
}
int tdbDbDrop(TDB *pDb) {
int tdbDrop(TDB *pDb) {
// TODO
return 0;
}
int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) {
int tdbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) {
return tdbBtreeInsert(pDb->pBt, pKey, keyLen, pVal, valLen, pTxn);
}
int tdbDbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn) { return tdbBtreeDelete(pDb->pBt, pKey, kLen, pTxn); }
int tdbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn) { return tdbBtreeDelete(pDb->pBt, pKey, kLen, pTxn); }
int tdbDbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) {
int tdbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) {
return tdbBtreeUpsert(pDb->pBt, pKey, kLen, pVal, vLen, pTxn);
}
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) {
int tdbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) {
return tdbBtreeGet(pDb->pBt, pKey, kLen, ppVal, vLen);
}
int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen) {
int tdbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen) {
return tdbBtreePGet(pDb->pBt, pKey, kLen, ppKey, pkLen, ppVal, vLen);
}
......
......@@ -131,7 +131,7 @@ TEST(tdb_test, simple_insert1) {
// Create a database
compFunc = tKeyCmpr;
ret = tdbDbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
ret = tdbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
{
......@@ -152,7 +152,7 @@ TEST(tdb_test, simple_insert1) {
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
ret = tdbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one
......@@ -181,7 +181,7 @@ TEST(tdb_test, simple_insert1) {
sprintf(key, "key%d", i);
sprintf(val, "value%d", i);
ret = tdbDbGet(pDb, key, strlen(key), &pVal, &vLen);
ret = tdbGet(pDb, key, strlen(key), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
......@@ -224,11 +224,11 @@ TEST(tdb_test, simple_insert1) {
}
}
ret = tdbDbDrop(pDb);
ret = tdbDrop(pDb);
GTEST_ASSERT_EQ(ret, 0);
// Close a database
tdbDbClose(pDb);
tdbClose(pDb);
// Close Env
ret = tdbEnvClose(pEnv);
......@@ -251,7 +251,7 @@ TEST(tdb_test, simple_insert2) {
// Create a database
compFunc = tDefaultKeyCmpr;
ret = tdbDbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
ret = tdbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
{
......@@ -271,7 +271,7 @@ TEST(tdb_test, simple_insert2) {
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
ret = tdbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
......@@ -311,11 +311,11 @@ TEST(tdb_test, simple_insert2) {
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
ret = tdbDbDrop(pDb);
ret = tdbDrop(pDb);
GTEST_ASSERT_EQ(ret, 0);
// Close a database
tdbDbClose(pDb);
tdbClose(pDb);
// Close Env
ret = tdbEnvClose(pEnv);
......@@ -346,7 +346,7 @@ TEST(tdb_test, simple_delete1) {
GTEST_ASSERT_EQ(ret, 0);
// open database
ret = tdbDbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb);
ret = tdbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
......@@ -356,7 +356,7 @@ TEST(tdb_test, simple_delete1) {
for (int iData = 0; iData < nKV; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
ret = tdbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
......@@ -365,7 +365,7 @@ TEST(tdb_test, simple_delete1) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData);
ret = tdbGet(pDb, key, strlen(key), &pData, &nData);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(memcmp(data, pData, nData), 0);
}
......@@ -374,7 +374,7 @@ TEST(tdb_test, simple_delete1) {
for (int iData = nKV - 1; iData > 30; iData--) {
sprintf(key, "key%d", iData);
ret = tdbDbDelete(pDb, key, strlen(key), &txn);
ret = tdbDelete(pDb, key, strlen(key), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
......@@ -382,7 +382,7 @@ TEST(tdb_test, simple_delete1) {
for (int iData = 0; iData < nKV; iData++) {
sprintf(key, "key%d", iData);
ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData);
ret = tdbGet(pDb, key, strlen(key), &pData, &nData);
if (iData <= 30) {
GTEST_ASSERT_EQ(ret, 0);
} else {
......@@ -413,7 +413,7 @@ TEST(tdb_test, simple_delete1) {
closePool(pPool);
tdbDbClose(pDb);
tdbClose(pDb);
tdbEnvClose(pEnv);
}
......@@ -435,7 +435,7 @@ TEST(tdb_test, simple_upsert1) {
GTEST_ASSERT_EQ(ret, 0);
// open database
ret = tdbDbOpen("db.db", -1, -1, NULL, pEnv, &pDb);
ret = tdbOpen("db.db", -1, -1, NULL, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
pPool = openPool();
......@@ -446,7 +446,7 @@ TEST(tdb_test, simple_upsert1) {
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
ret = tdbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
......@@ -454,7 +454,7 @@ TEST(tdb_test, simple_upsert1) {
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData);
ret = tdbGet(pDb, key, strlen(key), &pData, &nData);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(memcmp(pData, data, nData), 0);
}
......@@ -463,7 +463,7 @@ TEST(tdb_test, simple_upsert1) {
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d-u", iData);
ret = tdbDbUpsert(pDb, key, strlen(key), data, strlen(data), &txn);
ret = tdbUpsert(pDb, key, strlen(key), data, strlen(data), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
......@@ -473,11 +473,11 @@ TEST(tdb_test, simple_upsert1) {
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d-u", iData);
ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData);
ret = tdbGet(pDb, key, strlen(key), &pData, &nData);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(memcmp(pData, data, nData), 0);
}
tdbDbClose(pDb);
tdbClose(pDb);
tdbEnvClose(pEnv);
}
\ No newline at end of file
......@@ -145,9 +145,9 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
} while (0)
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
......@@ -227,7 +227,7 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1)
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
static void* cliWorkThread(void* arg);
......@@ -924,8 +924,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
arg->param1 = pMsg;
arg->param2 = pThrd;
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
cliDestroy((uv_handle_t*)pConn->stream);
cliDestroyConn(pConn, true);
return -1;
}
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
......
......@@ -244,6 +244,7 @@ static void walStopThread() {
if (taosCheckPthreadValid(tsWal.thread)) {
taosThreadJoin(tsWal.thread, NULL);
taosThreadClear(&tsWal.thread);
}
wDebug("wal thread is stopped");
......
......@@ -145,6 +145,7 @@ void taosCloseLog() {
taosStopLog();
if (tsLogObj.logHandle != NULL && taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) {
taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL);
taosThreadClear(&tsLogObj.logHandle->asyncThread);
}
tsLogInited = 0;
......
......@@ -209,6 +209,7 @@ void taosCleanUpScheduler(void *param) {
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
if (taosCheckPthreadValid(pSched->qthread[i])) {
taosThreadJoin(pSched->qthread[i], NULL);
taosThreadClear(&pSched->qthread[i]);
}
}
......
......@@ -57,6 +57,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
if (worker == NULL) continue;
if (taosCheckPthreadValid(worker->thread)) {
taosThreadJoin(worker->thread, NULL);
taosThreadClear(&worker->thread);
}
}
......@@ -179,6 +180,7 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
SWWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) {
taosThreadJoin(worker->thread, NULL);
taosThreadClear(&worker->thread);
taosFreeQall(worker->qall);
taosCloseQset(worker->qset);
}
......
......@@ -251,16 +251,12 @@ CaseCtrl gCaseCtrl = {
.bindColNum = 0,
.bindTagNum = 0,
.bindRowNum = 0,
.bindColTypeNum = tListLen(bindColTypeList),
.bindColTypeList = bindColTypeList,
.bindTagTypeNum = 0,
.bindTagTypeList = NULL,
.optrIdxListNum = tListLen(optrIdxList),
.optrIdxList = optrIdxList,
.checkParamNum = false,
.printRes = false,
.runTimes = 0,
.caseIdx = 23,
.caseIdx = 1,
.caseNum = 1,
.caseRunIdx = -1,
.caseRunNum = 1,
......@@ -1101,7 +1097,9 @@ void destroyData(BindData *data) {
taosMemoryFree(data->binaryLen);
taosMemoryFree(data->isNull);
taosMemoryFree(data->pBind);
taosMemoryFree(data->pTags);
taosMemoryFree(data->colTypes);
taosMemoryFree(data->sql);
}
void bpFetchRows(TAOS_RES *result, bool printr, int32_t *rows) {
......
......@@ -468,14 +468,10 @@ class TDTestCase:
cfgPath = buildPath + "/../sim/psim/cfg"
tdLog.info("cfgPath: %s" % cfgPath)
#self.tmqCase8(cfgPath, buildPath)
#self.tmqCase9(cfgPath, buildPath)
#self.tmqCase10(cfgPath, buildPath)
self.tmqCase8(cfgPath, buildPath)
self.tmqCase9(cfgPath, buildPath)
self.tmqCase10(cfgPath, buildPath)
self.tmqCase11(cfgPath, buildPath)
# self.tmqCase12(cfgPath, buildPath)
# self.tmqCase13(cfgPath, buildPath)
# self.tmqCase14(cfgPath, buildPath)
def stop(self):
tdSql.close()
......
......@@ -436,6 +436,7 @@ int32_t main(int32_t argc, char *argv[]) {
taosMsleep(300);
for (int32_t i = 0; i < numOfThreads; i++) {
taosThreadJoin(pInfo[i].thread, NULL);
taosThreadClear(&pInfo[i].thread);
}
int64_t maxDelay = 0;
......
......@@ -537,6 +537,7 @@ int main(int32_t argc, char* argv[]) {
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
taosThreadClear(&g_stConfInfo.stThreads[i].thread);
}
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
......
......@@ -56,6 +56,7 @@ void simFreeScript(SScript *script) {
bgScript->killed = true;
if (taosCheckPthreadValid(bgScript->bgPid)) {
taosThreadJoin(bgScript->bgPid, NULL);
taosThreadClear(&bgScript->bgPid);
}
simDebug("script:%s, background thread joined", bgScript->fileName);
......
......@@ -985,6 +985,7 @@ int32_t shellExecute() {
while (1) {
taosThreadCreate(&shell.pid, NULL, shellThreadLoop, shell.conn);
taosThreadJoin(shell.pid, NULL);
taosThreadClear(&shell.pid);
}
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册