提交 0c382080 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0' into feature/qnode

...@@ -226,10 +226,10 @@ endif(${BUILD_WITH_NURAFT}) ...@@ -226,10 +226,10 @@ endif(${BUILD_WITH_NURAFT})
if(${BUILD_PTHREAD}) if(${BUILD_PTHREAD})
set(CMAKE_BUILD_TYPE release) set(CMAKE_BUILD_TYPE release)
add_definitions(-DPTW32_STATIC_LIB) add_definitions(-DPTW32_STATIC_LIB)
add_subdirectory(pthread) add_subdirectory(pthread EXCLUDE_FROM_ALL)
set_target_properties(libpthreadVC3 PROPERTIES OUTPUT_NAME pthread) set_target_properties(libpthreadVC3 PROPERTIES OUTPUT_NAME pthread)
add_library(pthread STATIC IMPORTED GLOBAL) add_library(pthread INTERFACE)
SET_PROPERTY(TARGET pthread PROPERTY IMPORTED_LOCATION ${LIBRARY_OUTPUT_PATH}/pthread.lib) target_link_libraries(pthread INTERFACE libpthreadVC3)
endif() endif()
# iconv # iconv
......
...@@ -29,27 +29,42 @@ extern "C" { ...@@ -29,27 +29,42 @@ extern "C" {
typedef struct SSchema SSchema; typedef struct SSchema SSchema;
typedef struct STColumn STColumn; typedef struct STColumn STColumn;
typedef struct STSchema STSchema; typedef struct STSchema STSchema;
typedef struct SColVal SColVal;
typedef struct STSRow2 STSRow2; typedef struct STSRow2 STSRow2;
typedef struct STSRowBuilder STSRowBuilder; typedef struct STSRowBuilder STSRowBuilder;
typedef struct SKVIdx SKVIdx; typedef struct STagVal STagVal;
typedef struct STag STag;
// STSchema
// STSRow2
int32_t tEncodeTSRow(SEncoder *pEncoder, const STSRow2 *pRow);
int32_t tDecodeTSRow(SDecoder *pDecoder, STSRow2 *pRow);
// STSchema // STSchema
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema); int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema);
void tTSchemaDestroy(STSchema *pTSchema); void tTSchemaDestroy(STSchema *pTSchema);
// SColVal
#define ColValNONE ((SColVal){.type = COL_VAL_NONE, .nData = 0, .pData = NULL})
#define ColValNULL ((SColVal){.type = COL_VAL_NULL, .nData = 0, .pData = NULL})
#define ColValDATA(nData, pData) ((SColVal){.type = COL_VAL_DATA, .nData = (nData), .pData = (pData)})
// STSRow2
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow);
int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow);
int32_t tTSRowDup(const STSRow2 *pRow, STSRow2 **ppRow);
void tTSRowFree(STSRow2 *pRow);
int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
// STSRowBuilder // STSRowBuilder
int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, SSchema *pSchema, int32_t nCols); int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, int32_t nCols, SSchema *pSchema);
void tTSRowBuilderClear(STSRowBuilder *pBuilder); void tTSRowBuilderClear(STSRowBuilder *pBuilder);
void tTSRowBuilderReset(STSRowBuilder *pBuilder); void tTSRowBuilderReset(STSRowBuilder *pBuilder);
int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pData, uint32_t nData); int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, uint32_t nData);
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow); int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
// STag
int32_t tTagNew(STagVal *pTagVals, int16_t nTag, STag **ppTag);
void tTagFree(STag *pTag);
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, int32_t *nData);
int32_t tEncodeTag(SEncoder *pEncoder, STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, const STag **ppTag);
// STRUCT ================= // STRUCT =================
struct STColumn { struct STColumn {
col_id_t colId; col_id_t colId;
...@@ -68,31 +83,47 @@ struct STSchema { ...@@ -68,31 +83,47 @@ struct STSchema {
STColumn columns[]; STColumn columns[];
}; };
#define TSROW_HAS_NONE ((uint8_t)0x1)
#define TSROW_HAS_NULL ((uint8_t)0x2U)
#define TSROW_HAS_VAL ((uint8_t)0x4U)
#define TSROW_KV_ROW ((uint8_t)0x10U)
struct STSRow2 { struct STSRow2 {
TSKEY ts; TSKEY ts;
uint32_t flags; uint8_t flags;
union { int32_t sver;
int32_t sver; uint32_t nData;
int32_t ncols; uint8_t *pData;
};
uint32_t nData;
const uint8_t *pData;
}; };
struct STSRowBuilder { struct STSRowBuilder {
STColumn *pTColumn;
STSchema *pTSchema; STSchema *pTSchema;
int32_t szBitMap1;
int32_t szBitMap2;
int32_t szKVBuf; int32_t szKVBuf;
uint8_t *pKVBuf; uint8_t *pKVBuf;
int32_t szTPBuf; int32_t szTPBuf;
uint8_t *pTPBuf; uint8_t *pTPBuf;
int32_t nCols; int32_t iCol;
int32_t kvVLen; int32_t vlenKV;
int32_t tpVLen; int32_t vlenTP;
STSRow2 row; STSRow2 row;
}; };
#if 1 //==================================== typedef enum { COL_VAL_NONE = 0, COL_VAL_NULL = 1, COL_VAL_DATA = 2 } EColValT;
struct SColVal {
EColValT type;
uint32_t nData;
uint8_t *pData;
};
struct STagVal {
int16_t cid;
int8_t type;
uint32_t nData;
uint8_t *pData;
};
#if 1 //================================================================================================================================================
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap. // Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
#define TD_SUPPORT_BITMAP #define TD_SUPPORT_BITMAP
#define TD_SUPPORT_READ2 #define TD_SUPPORT_READ2
......
...@@ -275,10 +275,10 @@ int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp); ...@@ -275,10 +275,10 @@ int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp); int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
void tFreeSSubmitRsp(SSubmitRsp* pRsp); void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_SMA_ON ((int8_t)0x1) #define COL_SMA_ON ((int8_t)0x1)
#define COL_IDX_ON ((int8_t)0x2) #define COL_IDX_ON ((int8_t)0x2)
#define COL_VAL_SET ((int8_t)0x4) #define COL_SET_NULL ((int8_t)0x10)
#define COL_SET_VAL ((int8_t)0x20)
typedef struct SSchema { typedef struct SSchema {
int8_t type; int8_t type;
int8_t flags; int8_t flags;
...@@ -287,6 +287,9 @@ typedef struct SSchema { ...@@ -287,6 +287,9 @@ typedef struct SSchema {
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN];
} SSchema; } SSchema;
#define COL_IS_SET(FLG) ((FLG) & (COL_SET_VAL | COL_SET_NULL) != 0)
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON) #define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
#define SSCHMEA_TYPE(s) ((s)->type) #define SSCHMEA_TYPE(s) ((s)->type)
......
...@@ -41,8 +41,8 @@ typedef enum { ...@@ -41,8 +41,8 @@ typedef enum {
typedef int32_t (*PutToQueueFp)(void* pMgmt, SRpcMsg* pMsg); typedef int32_t (*PutToQueueFp)(void* pMgmt, SRpcMsg* pMsg);
typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg); typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg);
typedef void (*SendRspFp)(const SRpcMsg* pMsg); typedef void (*SendRspFp)(SRpcMsg* pMsg);
typedef void (*SendRedirectRspFp)(const SRpcMsg* pMsg, const SEpSet* pNewEpSet); typedef void (*SendRedirectRspFp)(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
typedef void (*RegisterBrokenLinkArgFp)(SRpcMsg* pMsg); typedef void (*RegisterBrokenLinkArgFp)(SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type); typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type);
typedef void (*ReportStartup)(const char* name, const char* desc); typedef void (*ReportStartup)(const char* name, const char* desc);
...@@ -64,8 +64,8 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb); ...@@ -64,8 +64,8 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg); int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
void tmsgSendRsp(const SRpcMsg* pMsg); void tmsgSendRsp(SRpcMsg* pMsg);
void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet); void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type); void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);
void tmsgReportStartup(const char* name, const char* desc); void tmsgReportStartup(const char* name, const char* desc);
......
...@@ -456,52 +456,189 @@ static FORCE_INLINE void* tDecoderMalloc(SDecoder* pCoder, int32_t size) { ...@@ -456,52 +456,189 @@ static FORCE_INLINE void* tDecoderMalloc(SDecoder* pCoder, int32_t size) {
return p; return p;
} }
static FORCE_INLINE int32_t tPutBinary(uint8_t* p, const uint8_t* pData, uint32_t nData) { // ===========================================
int n = 0; #define tPutV(p, v) \
uint32_t v = nData; do { \
int32_t n = 0; \
for (;;) { for (;;) { \
if (v <= 0x7f) { if (v <= 0x7f) { \
if (p) p[n] = v; if (p) p[n] = v; \
n++; n++; \
break; break; \
} } \
if (p) p[n] = (v & 0x7f) | 0x80; \
if (p) p[n] = (v & 0x7f) | 0x80; n++; \
n++; v >>= 7; \
v >>= 7; } \
} return n; \
} while (0)
if (p) { #define tGetV(p, v) \
memcpy(p + n, pData, nData); do { \
} int32_t n = 0; \
if (v) *v = 0; \
for (;;) { \
if (p[n] <= 0x7f) { \
if (v) (*v) |= (p[n] << (7 * n)); \
n++; \
break; \
} \
if (v) (*v) |= ((p[n] & 0x7f) << (7 * n)); \
n++; \
} \
return n; \
} while (0)
// PUT
static FORCE_INLINE int32_t tPutU8(uint8_t* p, uint8_t v) {
if (p) ((uint8_t*)p)[0] = v;
return sizeof(uint8_t);
}
static FORCE_INLINE int32_t tPutI8(uint8_t* p, int8_t v) {
if (p) ((int8_t*)p)[0] = v;
return sizeof(int8_t);
}
static FORCE_INLINE int32_t tPutU16(uint8_t* p, uint16_t v) {
if (p) ((uint16_t*)p)[0] = v;
return sizeof(uint16_t);
}
static FORCE_INLINE int32_t tPutI16(uint8_t* p, int16_t v) {
if (p) ((int16_t*)p)[0] = v;
return sizeof(int16_t);
}
static FORCE_INLINE int32_t tPutU32(uint8_t* p, uint32_t v) {
if (p) ((uint32_t*)p)[0] = v;
return sizeof(uint32_t);
}
static FORCE_INLINE int32_t tPutI32(uint8_t* p, int32_t v) {
if (p) ((int32_t*)p)[0] = v;
return sizeof(int32_t);
}
static FORCE_INLINE int32_t tPutU64(uint8_t* p, uint64_t v) {
if (p) ((uint64_t*)p)[0] = v;
return sizeof(uint64_t);
}
static FORCE_INLINE int32_t tPutI64(uint8_t* p, int64_t v) {
if (p) ((int64_t*)p)[0] = v;
return sizeof(int64_t);
}
static FORCE_INLINE int32_t tPutU16v(uint8_t* p, uint16_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutI16v(uint8_t* p, int16_t v) { return tPutU16v(p, ZIGZAGE(int16_t, v)); }
static FORCE_INLINE int32_t tPutU32v(uint8_t* p, uint32_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutI32v(uint8_t* p, int32_t v) { return tPutU32v(p, ZIGZAGE(int32_t, v)); }
static FORCE_INLINE int32_t tPutU64v(uint8_t* p, uint64_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutI64v(uint8_t* p, int64_t v) { return tPutU64v(p, ZIGZAGE(int64_t, v)); }
// GET
static FORCE_INLINE int32_t tGetU8(uint8_t* p, uint8_t* v) {
if (v) *v = ((uint8_t*)p)[0];
return sizeof(uint8_t);
}
static FORCE_INLINE int32_t tGetI8(uint8_t* p, int8_t* v) {
if (v) *v = ((int8_t*)p)[0];
return sizeof(int8_t);
}
static FORCE_INLINE int32_t tGetU16(uint8_t* p, uint16_t* v) {
if (v) *v = ((uint16_t*)p)[0];
return sizeof(uint16_t);
}
static FORCE_INLINE int32_t tGetI16(uint8_t* p, int16_t* v) {
if (v) *v = ((int16_t*)p)[0];
return sizeof(int16_t);
}
static FORCE_INLINE int32_t tGetU32(uint8_t* p, uint32_t* v) {
if (v) *v = ((uint32_t*)p)[0];
return sizeof(uint32_t);
}
static FORCE_INLINE int32_t tGetI32(uint8_t* p, int32_t* v) {
if (v) *v = ((int32_t*)p)[0];
return sizeof(int32_t);
}
static FORCE_INLINE int32_t tGetU64(uint8_t* p, uint64_t* v) {
if (v) *v = ((uint64_t*)p)[0];
return sizeof(uint64_t);
}
static FORCE_INLINE int32_t tGetI64(uint8_t* p, int64_t* v) {
if (v) *v = ((int64_t*)p)[0];
return sizeof(int64_t);
}
static FORCE_INLINE int32_t tGetU16v(uint8_t* p, uint16_t* v) { tGetV(p, v); }
static FORCE_INLINE int32_t tGetI16v(uint8_t* p, int16_t* v) {
int32_t n;
uint16_t tv;
n = tGetU16v(p, &tv);
if (v) *v = ZIGZAGD(int16_t, tv);
return n;
}
static FORCE_INLINE int32_t tGetU32v(uint8_t* p, uint32_t* v) { tGetV(p, v); }
static FORCE_INLINE int32_t tGetI32v(uint8_t* p, int32_t* v) {
int32_t n;
uint32_t tv;
n = tGetU32v(p, &tv);
if (v) *v = ZIGZAGD(int32_t, tv);
return n;
}
static FORCE_INLINE int32_t tGetU64v(uint8_t* p, uint64_t* v) { tGetV(p, v); }
static FORCE_INLINE int32_t tGetI64v(uint8_t* p, int64_t* v) {
int32_t n;
uint64_t tv;
n = tGetU64v(p, &tv);
if (v) *v = ZIGZAGD(int64_t, tv);
return n;
}
// =====================
static FORCE_INLINE int32_t tPutBinary(uint8_t* p, uint8_t* pData, uint32_t nData) {
int n = 0;
n += tPutU32v(p ? p + n : p, nData);
if (p) memcpy(p + n, pData, nData);
n += nData; n += nData;
return n; return n;
} }
static FORCE_INLINE int32_t tGetBinary(const uint8_t* p, const uint8_t** ppData, uint32_t* nData) { static FORCE_INLINE int32_t tGetBinary(uint8_t* p, uint8_t** ppData, uint32_t* nData) {
int32_t n = 0; int32_t n = 0;
uint32_t tv = 0; uint32_t nt;
uint32_t t;
for (;;) {
if (p[n] <= 0x7f) {
t = p[n];
tv |= (t << (7 * n));
n++;
break;
}
t = p[n] & 0x7f;
tv |= (t << (7 * n));
n++;
}
if (nData) *nData = n; n += tGetU32v(p, &nt);
if (nData) *nData = nt;
if (ppData) *ppData = p + n; if (ppData) *ppData = p + n;
n += nt;
n += tv;
return n; return n;
} }
......
...@@ -714,25 +714,31 @@ static bool smlIsNchar(const char *pVal, uint16_t len) { ...@@ -714,25 +714,31 @@ static bool smlIsNchar(const char *pVal, uint16_t len) {
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) { static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
char *endPtr = NULL; char *endPtr = NULL;
double ts = (double)strtoll(value, &endPtr, 10); int64_t tsInt64 = strtoll(value, &endPtr, 10);
if(value + len != endPtr){ if(value + len != endPtr){
return -1; return -1;
} }
double ts = tsInt64;
switch (type) { switch (type) {
case TSDB_TIME_PRECISION_HOURS: case TSDB_TIME_PRECISION_HOURS:
ts *= (3600 * 1e9); ts *= (3600 * 1e9);
tsInt64 *= (3600 * 1e9);
break; break;
case TSDB_TIME_PRECISION_MINUTES: case TSDB_TIME_PRECISION_MINUTES:
ts *= (60 * 1e9); ts *= (60 * 1e9);
tsInt64 *= (60 * 1e9);
break; break;
case TSDB_TIME_PRECISION_SECONDS: case TSDB_TIME_PRECISION_SECONDS:
ts *= (1e9); ts *= (1e9);
tsInt64 *= (1e9);
break; break;
case TSDB_TIME_PRECISION_MILLI: case TSDB_TIME_PRECISION_MILLI:
ts *= (1e6); ts *= (1e6);
tsInt64 *= (1e6);
break; break;
case TSDB_TIME_PRECISION_MICRO: case TSDB_TIME_PRECISION_MICRO:
ts *= (1e3); ts *= (1e3);
tsInt64 *= (1e3);
break; break;
case TSDB_TIME_PRECISION_NANO: case TSDB_TIME_PRECISION_NANO:
break; break;
...@@ -743,7 +749,7 @@ static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) { ...@@ -743,7 +749,7 @@ static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
return -1; return -1;
} }
return (int64_t)ts; return tsInt64;
} }
static int64_t smlGetTimeNow(int8_t precision) { static int64_t smlGetTimeNow(int8_t precision) {
......
...@@ -1188,3 +1188,36 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) { ...@@ -1188,3 +1188,36 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) {
destroyRequest(request); destroyRequest(request);
smlDestroyInfo(info); smlDestroyInfo(info);
} }
TEST(testCase, sml_TD15662_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES *pRes = taos_query(taos, "create database if not exists db_15662 precision 'ns'");
taos_free_result(pRes);
pRes = taos_query(taos, "use db_15662");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql[] = {
"iyyyje,id=iyyyje_41943_1303,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=false,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000",
};
int ret = smlProcess(info, (char **)sql, sizeof(sql) / sizeof(sql[0]));
ASSERT_EQ(ret, 0);
// case 1
TAOS_RES *res = taos_query(taos, "select * from t_a5615048edae55218a22a149edebdc82");
ASSERT_NE(res, nullptr);
TAOS_ROW row = taos_fetch_row(res);
int64_t ts = *(int64_t*)row[0];
ASSERT_EQ(ts, 1626006833639000000);
taos_free_result(res);
}
\ No newline at end of file
此差异已折叠。
...@@ -21,9 +21,9 @@ static SMsgCb tsDefaultMsgCb; ...@@ -21,9 +21,9 @@ static SMsgCb tsDefaultMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; } void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg) {
PutToQueueFp fp = pMsgCb->queueFps[qtype]; PutToQueueFp fp = pMsgCb->queueFps[qtype];
return (*fp)(pMsgCb->mgmt, pReq); return (*fp)(pMsgCb->mgmt, pMsg);
} }
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
...@@ -31,17 +31,17 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { ...@@ -31,17 +31,17 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
return (*fp)(pMsgCb->mgmt, vgId, qtype); return (*fp)(pMsgCb->mgmt, vgId, qtype);
} }
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pReq) { int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
SendReqFp fp = tsDefaultMsgCb.sendReqFp; SendReqFp fp = tsDefaultMsgCb.sendReqFp;
return (*fp)(epSet, pReq); return (*fp)(epSet, pMsg);
} }
void tmsgSendRsp(const SRpcMsg* pMsg) { void tmsgSendRsp(SRpcMsg* pMsg) {
SendRspFp fp = tsDefaultMsgCb.sendRspFp; SendRspFp fp = tsDefaultMsgCb.sendRspFp;
return (*fp)(pMsg); return (*fp)(pMsg);
} }
void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet) { void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp; SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
(*fp)(pMsg, pNewEpSet); (*fp)(pMsg, pNewEpSet);
} }
......
...@@ -374,39 +374,135 @@ char getPrecisionUnit(int32_t precision) { ...@@ -374,39 +374,135 @@ char getPrecisionUnit(int32_t precision) {
} }
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) { int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO || assert(fromPrecision == TSDB_TIME_PRECISION_MILLI ||
fromPrecision == TSDB_TIME_PRECISION_MICRO ||
fromPrecision == TSDB_TIME_PRECISION_NANO); fromPrecision == TSDB_TIME_PRECISION_NANO);
assert(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO || assert(toPrecision == TSDB_TIME_PRECISION_MILLI ||
toPrecision == TSDB_TIME_PRECISION_MICRO ||
toPrecision == TSDB_TIME_PRECISION_NANO); toPrecision == TSDB_TIME_PRECISION_NANO);
static double factors[3][3] = {{1., 1000., 1000000.}, {1.0 / 1000, 1., 1000.}, {1.0 / 1000000, 1.0 / 1000, 1.}}; double tempResult = (double)time;
return (int64_t)((double)time * factors[fromPrecision][toPrecision]); switch(fromPrecision) {
case TSDB_TIME_PRECISION_MILLI: {
switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI:
return time;
case TSDB_TIME_PRECISION_MICRO:
tempResult *= 1000;
time *= 1000;
goto end_;
case TSDB_TIME_PRECISION_NANO:
tempResult *= 1000000;
time *= 1000000;
goto end_;
}
} // end from milli
case TSDB_TIME_PRECISION_MICRO: {
switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI:
return time / 1000;
case TSDB_TIME_PRECISION_MICRO:
return time;
case TSDB_TIME_PRECISION_NANO:
tempResult *= 1000;
time *= 1000;
goto end_;
}
} //end from micro
case TSDB_TIME_PRECISION_NANO: {
switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI:
return time / 1000000;
case TSDB_TIME_PRECISION_MICRO:
return time / 1000;
case TSDB_TIME_PRECISION_NANO:
return time;
}
} //end from nano
default: {
assert(0);
return time; // only to pass windows compilation
}
} //end switch fromPrecision
end_:
if (tempResult >= (double)INT64_MAX) return INT64_MAX;
if (tempResult <= (double)INT64_MIN) return INT64_MIN; // INT64_MIN means NULL
return time;
} }
// !!!!notice:there are precision problems, double lose precison if time is too large, for example: 1626006833631000000*1.0 = double = 1626006833631000064
//int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
// assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
// fromPrecision == TSDB_TIME_PRECISION_NANO);
// assert(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO ||
// toPrecision == TSDB_TIME_PRECISION_NANO);
// static double factors[3][3] = {{1., 1000., 1000000.}, {1.0 / 1000, 1., 1000.}, {1.0 / 1000000, 1.0 / 1000, 1.}};
// ((double)time * factors[fromPrecision][toPrecision]);
//}
// !!!!notice: double lose precison if time is too large, for example: 1626006833631000000*1.0 = double = 1626006833631000064
int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit) { int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit) {
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO || assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
fromPrecision == TSDB_TIME_PRECISION_NANO); fromPrecision == TSDB_TIME_PRECISION_NANO);
static double factors[3] = {1000000., 1000., 1.}; int64_t factors[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
double tmp = time;
switch (toUnit) { switch (toUnit) {
case 's': case 's':{
return time * factors[fromPrecision] / NANOSECOND_PER_SEC; tmp /= (NANOSECOND_PER_SEC/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_SEC/factors[fromPrecision]);
break;
}
case 'm': case 'm':
return time * factors[fromPrecision] / NANOSECOND_PER_MINUTE; tmp /= (NANOSECOND_PER_MINUTE/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_MINUTE/factors[fromPrecision]);
break;
case 'h': case 'h':
return time * factors[fromPrecision] / NANOSECOND_PER_HOUR; tmp /= (NANOSECOND_PER_HOUR/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_HOUR/factors[fromPrecision]);
break;
case 'd': case 'd':
return time * factors[fromPrecision] / NANOSECOND_PER_DAY; tmp /= (NANOSECOND_PER_DAY/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_DAY/factors[fromPrecision]);
break;
case 'w': case 'w':
return time * factors[fromPrecision] / NANOSECOND_PER_WEEK; tmp /= (NANOSECOND_PER_WEEK/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_WEEK/factors[fromPrecision]);
break;
case 'a': case 'a':
return time * factors[fromPrecision] / NANOSECOND_PER_MSEC; tmp /= (NANOSECOND_PER_MSEC/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_MSEC/factors[fromPrecision]);
break;
case 'u': case 'u':
return time * factors[fromPrecision] / NANOSECOND_PER_USEC; // the result of (NANOSECOND_PER_USEC/(double)factors[fromPrecision]) maybe a double
switch (fromPrecision) {
case TSDB_TIME_PRECISION_MILLI:{
tmp *= 1000;
time *= 1000;
break;
}
case TSDB_TIME_PRECISION_MICRO:{
tmp /= 1;
time /= 1;
break;
}
case TSDB_TIME_PRECISION_NANO:{
tmp /= 1000;
time /= 1000;
break;
}
}
break;
case 'b': case 'b':
return time * factors[fromPrecision]; tmp *= factors[fromPrecision];
time *= factors[fromPrecision];
break;
default: { default: {
return -1; return -1;
} }
} }
if (tmp >= (double)INT64_MAX) return INT64_MAX;
if (tmp <= (double)INT64_MIN) return INT64_MIN;
return time;
} }
int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec, int64_t *timeVal) { int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec, int64_t *timeVal) {
......
...@@ -17,6 +17,15 @@ TARGET_INCLUDE_DIRECTORIES( ...@@ -17,6 +17,15 @@ TARGET_INCLUDE_DIRECTORIES(
PRIVATE "${TD_SOURCE_DIR}/source/libs/common/inc" PRIVATE "${TD_SOURCE_DIR}/source/libs/common/inc"
) )
# dataformatTest.cpp
add_executable(dataformatTest "")
target_sources(
dataformatTest
PRIVATE
"dataformatTest.cpp"
)
target_link_libraries(dataformatTest gtest gtest_main util)
# tmsg test # tmsg test
# add_executable(tmsgTest "") # add_executable(tmsgTest "")
# target_sources(tmsgTest # target_sources(tmsgTest
......
#include "gtest/gtest.h"
\ No newline at end of file
#include <gtest/gtest.h>
#include "trow.h"
TEST(td_row_test, build_row_to_target) {
#if 0
char dst[1024];
SRow* pRow = (SRow*)dst;
int ncols = 10;
col_id_t cid;
void* pData;
SRowBuilder rb = trbInit(TD_OR_ROW_BUILDER, NULL, 0, pRow, 1024);
trbSetRowInfo(&rb, false, 0);
trbSetRowTS(&rb, 1637550210000);
for (int c = 0; c < ncols; c++) {
cid = c;
if (trbWriteCol(&rb, pData, cid) < 0) {
// TODO
}
}
#endif
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
...@@ -75,6 +75,7 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) { ...@@ -75,6 +75,7 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
void dmStopStatusThread(SDnodeMgmt *pMgmt) { void dmStopStatusThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->statusThread)) { if (taosCheckPthreadValid(pMgmt->statusThread)) {
taosThreadJoin(pMgmt->statusThread, NULL); taosThreadJoin(pMgmt->statusThread, NULL);
taosThreadClear(&pMgmt->statusThread);
} }
} }
...@@ -95,6 +96,7 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) { ...@@ -95,6 +96,7 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
void dmStopMonitorThread(SDnodeMgmt *pMgmt) { void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->monitorThread)) { if (taosCheckPthreadValid(pMgmt->monitorThread)) {
taosThreadJoin(pMgmt->monitorThread, NULL); taosThreadJoin(pMgmt->monitorThread, NULL);
taosThreadClear(&pMgmt->monitorThread);
} }
} }
......
...@@ -196,6 +196,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { ...@@ -196,6 +196,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
SVnodeThread *pThread = &threads[t]; SVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) { if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
taosThreadJoin(pThread->thread, NULL); taosThreadJoin(pThread->thread, NULL);
taosThreadClear(&pThread->thread);
} }
taosMemoryFree(pThread->pCfgs); taosMemoryFree(pThread->pCfgs);
} }
......
...@@ -151,12 +151,10 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper); ...@@ -151,12 +151,10 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper);
void dmCleanupProc(struct SMgmtWrapper *pWrapper); void dmCleanupProc(struct SMgmtWrapper *pWrapper);
int32_t dmRunProc(SProc *proc); int32_t dmRunProc(SProc *proc);
void dmStopProc(SProc *proc); void dmStopProc(SProc *proc);
int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle); void dmRemoveProcRpcHandle(SProc *proc, void *handle);
void dmCloseProcRpcHandles(SProc *proc); void dmCloseProcRpcHandles(SProc *proc);
int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype);
void *handle, int64_t handleRef, EProcFuncType ftype); void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype);
void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype);
// dmTransport.c // dmTransport.c
int32_t dmInitServer(SDnode *pDnode); int32_t dmInitServer(SDnode *pDnode);
......
...@@ -58,6 +58,7 @@ static int32_t dmNewProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) { ...@@ -58,6 +58,7 @@ static int32_t dmNewProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) {
return -1; return -1;
} }
taosIgnSignal(SIGCHLD);
pWrapper->proc.pid = pid; pWrapper->proc.pid = pid;
dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid); dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
return 0; return 0;
...@@ -254,7 +255,7 @@ static void dmWatchNodes(SDnode *pDnode) { ...@@ -254,7 +255,7 @@ static void dmWatchNodes(SDnode *pDnode) {
if (!OnlyInParentProc(pWrapper)) continue; if (!OnlyInParentProc(pWrapper)) continue;
if (proc->pid <= 0 || !taosProcExist(proc->pid)) { if (proc->pid <= 0 || !taosProcExist(proc->pid)) {
dWarn("node:%s, process:%d is killed and needs to restart", pWrapper->name, proc->pid); dError("node:%s, process:%d is killed and needs to restart", pWrapper->name, proc->pid);
dmCloseProcRpcHandles(&pWrapper->proc); dmCloseProcRpcHandles(&pWrapper->proc);
dmNewProc(pWrapper, ntype); dmNewProc(pWrapper, ntype);
} }
......
...@@ -16,10 +16,7 @@ ...@@ -16,10 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmMgmt.h" #include "dmMgmt.h"
static inline int32_t CEIL8(int32_t v) { static inline int32_t CEIL8(int32_t v) { return ceil((float)(v) / 8) * 8; }
const int32_t c = ceil((float)(v) / 8) * 8;
return c < 8 ? 8 : c;
}
static int32_t dmInitProcMutex(SProcQueue *queue) { static int32_t dmInitProcMutex(SProcQueue *queue) {
TdThreadMutexAttr mattr = {0}; TdThreadMutexAttr mattr = {0};
...@@ -87,42 +84,17 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) { ...@@ -87,42 +84,17 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
return queue; return queue;
} }
#if 0 static void dmCleanupProcQueue(SProcQueue *queue) {}
static void dmDestroyProcQueue(SProcQueue *queue) {
if (queue->mutex != NULL) {
taosThreadMutexDestroy(queue->mutex);
queue->mutex = NULL;
}
}
static void dmDestroyProcSem(SProcQueue *queue) {
if (queue->sem != NULL) {
tsem_destroy(queue->sem);
queue->sem = NULL;
}
}
#endif
static void dmCleanupProcQueue(SProcQueue *queue) { static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
#if 0 const void *pHead = pMsg;
if (queue != NULL) { const void *pBody = pMsg->pCont;
dmDestroyProcQueue(queue); const int16_t rawHeadLen = sizeof(SRpcMsg);
dmDestroyProcSem(queue); const int32_t rawBodyLen = pMsg->contLen;
} const int16_t headLen = CEIL8(rawHeadLen);
#endif
}
static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHead, int16_t rawHeadLen,
const char *pBody, int32_t rawBodyLen, int64_t handle, int64_t handleRef,
EProcFuncType ftype) {
if (rawHeadLen == 0 || pHead == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
const int32_t headLen = CEIL8(rawHeadLen);
const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8; const int32_t fullLen = headLen + bodyLen + 8;
const int64_t handle = (int64_t)pMsg->info.handle;
taosThreadMutexLock(&queue->mutex); taosThreadMutexLock(&queue->mutex);
if (fullLen > queue->avail) { if (fullLen > queue->avail) {
...@@ -131,8 +103,8 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe ...@@ -131,8 +103,8 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
return -1; return -1;
} }
if (handle != 0 && ftype == DND_FUNC_REQ) { if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0) {
if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) { if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &pMsg->info, sizeof(SRpcConnInfo)) != 0) {
taosThreadMutexUnlock(&queue->mutex); taosThreadMutexUnlock(&queue->mutex);
return -1; return -1;
} }
...@@ -151,31 +123,31 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe ...@@ -151,31 +123,31 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
if (queue->tail < queue->head) { if (queue->tail < queue->head) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen); memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen); if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen);
queue->tail = queue->tail + 8 + headLen + bodyLen; queue->tail = queue->tail + 8 + headLen + bodyLen;
} else { } else {
int32_t remain = queue->total - queue->tail; int32_t remain = queue->total - queue->tail;
if (remain == 0) { if (remain == 0) {
memcpy(queue->pBuffer + 8, pHead, rawHeadLen); memcpy(queue->pBuffer + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen); if (rawBodyLen > 0) memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen);
queue->tail = 8 + headLen + bodyLen; queue->tail = 8 + headLen + bodyLen;
} else if (remain == 8) { } else if (remain == 8) {
memcpy(queue->pBuffer, pHead, rawHeadLen); memcpy(queue->pBuffer, pHead, rawHeadLen);
memcpy(queue->pBuffer + headLen, pBody, rawBodyLen); if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen, pBody, rawBodyLen);
queue->tail = headLen + bodyLen; queue->tail = headLen + bodyLen;
} else if (remain < 8 + headLen) { } else if (remain < 8 + headLen) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8); memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8);
memcpy(queue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8)); memcpy(queue->pBuffer, (char*)pHead + remain - 8, rawHeadLen - (remain - 8));
memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen); if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
queue->tail = headLen - (remain - 8) + bodyLen; queue->tail = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + headLen + bodyLen) { } else if (remain < 8 + headLen + bodyLen) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen); memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen); if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
memcpy(queue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen)); if (rawBodyLen > 0) memcpy(queue->pBuffer, (char*)pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
queue->tail = bodyLen - (remain - 8 - headLen); queue->tail = bodyLen - (remain - 8 - headLen);
} else { } else {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen); memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen); if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen);
queue->tail = queue->tail + headLen + bodyLen + 8; queue->tail = queue->tail + headLen + bodyLen + 8;
} }
} }
...@@ -185,13 +157,12 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe ...@@ -185,13 +157,12 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
taosThreadMutexUnlock(&queue->mutex); taosThreadMutexUnlock(&queue->mutex);
tsem_post(&queue->sem); tsem_post(&queue->sem);
dTrace("node:%s, push %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen, dTrace("node:%s, push %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype),
pBody, bodyLen, pos, queue->items); pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, pMsg->code, pos, queue->items);
return 0; return 0;
} }
static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHeadLen, void **ppBody, int32_t *pBodyLen, static int32_t dmPopFromProcQueue(SProcQueue *queue, SRpcMsg **ppMsg, EProcFuncType *pFuncType) {
EProcFuncType *pFuncType) {
tsem_wait(&queue->sem); tsem_wait(&queue->sem);
taosThreadMutexLock(&queue->mutex); taosThreadMutexLock(&queue->mutex);
...@@ -217,8 +188,9 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe ...@@ -217,8 +188,9 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
int32_t bodyLen = CEIL8(rawBodyLen); int32_t bodyLen = CEIL8(rawBodyLen);
void *pHead = taosAllocateQitem(headLen, DEF_QITEM); void *pHead = taosAllocateQitem(headLen, DEF_QITEM);
void *pBody = rpcMallocCont(bodyLen); void *pBody = NULL;
if (pHead == NULL || pBody == NULL) { if (bodyLen > 0) pBody = rpcMallocCont(bodyLen);
if (pHead == NULL || (bodyLen > 0 && pBody == NULL)) {
taosThreadMutexUnlock(&queue->mutex); taosThreadMutexUnlock(&queue->mutex);
tsem_post(&queue->sem); tsem_post(&queue->sem);
taosFreeQitem(pHead); taosFreeQitem(pHead);
...@@ -230,31 +202,31 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe ...@@ -230,31 +202,31 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
const int32_t pos = queue->head; const int32_t pos = queue->head;
if (queue->head < queue->tail) { if (queue->head < queue->tail) {
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen); memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen); if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen);
queue->head = queue->head + 8 + headLen + bodyLen; queue->head = queue->head + 8 + headLen + bodyLen;
} else { } else {
int32_t remain = queue->total - queue->head; int32_t remain = queue->total - queue->head;
if (remain == 0) { if (remain == 0) {
memcpy(pHead, queue->pBuffer + 8, headLen); memcpy(pHead, queue->pBuffer + 8, headLen);
memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen); if (bodyLen > 0) memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen);
queue->head = 8 + headLen + bodyLen; queue->head = 8 + headLen + bodyLen;
} else if (remain == 8) { } else if (remain == 8) {
memcpy(pHead, queue->pBuffer, headLen); memcpy(pHead, queue->pBuffer, headLen);
memcpy(pBody, queue->pBuffer + headLen, bodyLen); if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen, bodyLen);
queue->head = headLen + bodyLen; queue->head = headLen + bodyLen;
} else if (remain < 8 + headLen) { } else if (remain < 8 + headLen) {
memcpy(pHead, queue->pBuffer + queue->head + 8, remain - 8); memcpy(pHead, queue->pBuffer + queue->head + 8, remain - 8);
memcpy((char *)pHead + remain - 8, queue->pBuffer, headLen - (remain - 8)); memcpy((char *)pHead + remain - 8, queue->pBuffer, headLen - (remain - 8));
memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen); if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen);
queue->head = headLen - (remain - 8) + bodyLen; queue->head = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + headLen + bodyLen) { } else if (remain < 8 + headLen + bodyLen) {
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen); memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen); if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen);
memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen)); if (bodyLen > 0) memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen));
queue->head = bodyLen - (remain - 8 - headLen); queue->head = bodyLen - (remain - 8 - headLen);
} else { } else {
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen); memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen); if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen);
queue->head = queue->head + headLen + bodyLen + 8; queue->head = queue->head + headLen + bodyLen + 8;
} }
} }
...@@ -263,14 +235,12 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe ...@@ -263,14 +235,12 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
queue->items--; queue->items--;
taosThreadMutexUnlock(&queue->mutex); taosThreadMutexUnlock(&queue->mutex);
*ppHead = pHead; *ppMsg = pHead;
*ppBody = pBody; (*ppMsg)->pCont = pBody;
*pHeadLen = rawHeadLen;
*pBodyLen = rawBodyLen;
*pFuncType = (EProcFuncType)ftype; *pFuncType = (EProcFuncType)ftype;
dTrace("node:%s, pop %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen, pBody, dTrace("node:%s, pop %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype),
bodyLen, pos, queue->items); (*ppMsg), (*ppMsg)->msgType, (*ppMsg)->info.handle, (*ppMsg)->contLen, (*ppMsg)->code, pos, queue->items);
return 1; return 1;
} }
...@@ -308,18 +278,14 @@ static void *dmConsumChildQueue(void *param) { ...@@ -308,18 +278,14 @@ static void *dmConsumChildQueue(void *param) {
SProc *proc = param; SProc *proc = param;
SMgmtWrapper *pWrapper = proc->wrapper; SMgmtWrapper *pWrapper = proc->wrapper;
SProcQueue *queue = proc->cqueue; SProcQueue *queue = proc->cqueue;
void *pHead = NULL;
void *pBody = NULL;
int16_t headLen = 0;
int32_t bodyLen = 0;
int32_t numOfMsgs = 0; int32_t numOfMsgs = 0;
int32_t code = 0; int32_t code = 0;
EProcFuncType ftype = DND_FUNC_REQ; EProcFuncType ftype = DND_FUNC_REQ;
SRpcMsg *pReq = NULL; SRpcMsg *pMsg = NULL;
dDebug("node:%s, start to consume from cqueue", proc->name); dDebug("node:%s, start to consume from cqueue", proc->name);
do { do {
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype); numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
dDebug("node:%s, get no msg from cqueue and exit thread", proc->name); dDebug("node:%s, get no msg from cqueue and exit thread", proc->name);
break; break;
...@@ -332,25 +298,24 @@ static void *dmConsumChildQueue(void *param) { ...@@ -332,25 +298,24 @@ static void *dmConsumChildQueue(void *param) {
} }
if (ftype != DND_FUNC_REQ) { if (ftype != DND_FUNC_REQ) {
dFatal("node:%s, get msg:%p from cqueue, invalid ftype:%d", proc->name, pHead, ftype); dError("node:%s, invalid ftype:%d from cqueue", proc->name, ftype);
taosFreeQitem(pHead); rpcFreeCont(pMsg->pCont);
rpcFreeCont(pBody); taosFreeQitem(pMsg);
} else { continue;
pReq = pHead; }
pReq->pCont = pBody;
code = dmProcessNodeMsg(pWrapper, pReq); code = dmProcessNodeMsg(pWrapper, pMsg);
if (code != 0) { if (code != 0) {
dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pReq, terrstr()); dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr());
SRpcMsg rspMsg = { SRpcMsg rsp = {
.info = pReq->info, .code = (terrno != 0 ? terrno : code),
.pCont = pReq->info.rsp, .pCont = pMsg->info.rsp,
.contLen = pReq->info.rspLen, .contLen = pMsg->info.rspLen,
}; .info = pMsg->info,
dmPutToProcPQueue(proc, &rspMsg, sizeof(SRpcMsg), rspMsg.pCont, rspMsg.contLen, DND_FUNC_RSP); };
taosFreeQitem(pHead); dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP);
rpcFreeCont(pBody); rpcFreeCont(pMsg->pCont);
rpcFreeCont(rspMsg.pCont); taosFreeQitem(pMsg);
}
} }
} while (1); } while (1);
...@@ -361,18 +326,14 @@ static void *dmConsumParentQueue(void *param) { ...@@ -361,18 +326,14 @@ static void *dmConsumParentQueue(void *param) {
SProc *proc = param; SProc *proc = param;
SMgmtWrapper *pWrapper = proc->wrapper; SMgmtWrapper *pWrapper = proc->wrapper;
SProcQueue *queue = proc->pqueue; SProcQueue *queue = proc->pqueue;
void *pHead = NULL;
void *pBody = NULL;
int16_t headLen = 0;
int32_t bodyLen = 0;
int32_t numOfMsgs = 0; int32_t numOfMsgs = 0;
int32_t code = 0; int32_t code = 0;
EProcFuncType ftype = DND_FUNC_REQ; EProcFuncType ftype = DND_FUNC_REQ;
SRpcMsg *pRsp = NULL; SRpcMsg *pMsg = NULL;
dDebug("node:%s, start to consume from pqueue", proc->name); dDebug("node:%s, start to consume from pqueue", proc->name);
do { do {
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype); numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
dDebug("node:%s, get no msg from pqueue and exit thread", proc->name); dDebug("node:%s, get no msg from pqueue and exit thread", proc->name);
break; break;
...@@ -385,31 +346,19 @@ static void *dmConsumParentQueue(void *param) { ...@@ -385,31 +346,19 @@ static void *dmConsumParentQueue(void *param) {
} }
if (ftype == DND_FUNC_RSP) { if (ftype == DND_FUNC_RSP) {
pRsp = pHead; dmRemoveProcRpcHandle(proc, pMsg->info.handle);
pRsp->pCont = pBody; rpcSendResponse(pMsg);
dTrace("node:%s, get rsp msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->info.handle);
dmRemoveProcRpcHandle(proc, pRsp->info.handle);
rpcSendResponse(pRsp);
} else if (ftype == DND_FUNC_REGIST) { } else if (ftype == DND_FUNC_REGIST) {
pRsp = pHead; rpcRegisterBrokenLinkArg(pMsg);
pRsp->pCont = pBody;
dTrace("node:%s, get regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code,
pRsp->info.handle);
rpcRegisterBrokenLinkArg(pRsp);
} else if (ftype == DND_FUNC_RELEASE) { } else if (ftype == DND_FUNC_RELEASE) {
pRsp = pHead; dmRemoveProcRpcHandle(proc, pMsg->info.handle);
pRsp->pCont = NULL; rpcReleaseHandle(pMsg->info.handle, (int8_t)pMsg->code);
dTrace("node:%s, get release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code,
pRsp->info.handle);
dmRemoveProcRpcHandle(proc, pRsp->info.handle);
rpcReleaseHandle(pRsp->info.handle, (int8_t)pRsp->code);
rpcFreeCont(pBody);
} else { } else {
dFatal("node:%s, get msg:%p from pqueue, invalid ftype:%d", proc->name, pHead, ftype); dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype);
rpcFreeCont(pBody); rpcFreeCont(pMsg->pCont);
} }
taosFreeQitem(pHead); taosFreeQitem(pMsg);
} while (1); } while (1);
return NULL; return NULL;
...@@ -468,51 +417,55 @@ void dmCleanupProc(struct SMgmtWrapper *pWrapper) { ...@@ -468,51 +417,55 @@ void dmCleanupProc(struct SMgmtWrapper *pWrapper) {
dmCleanupProcQueue(proc->cqueue); dmCleanupProcQueue(proc->cqueue);
dmCleanupProcQueue(proc->pqueue); dmCleanupProcQueue(proc->pqueue);
taosHashCleanup(proc->hash); taosHashCleanup(proc->hash);
proc->hash = NULL;
dDebug("node:%s, proc is cleaned up", pWrapper->name); dDebug("node:%s, proc is cleaned up", pWrapper->name);
} }
int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle) { void dmRemoveProcRpcHandle(SProc *proc, void *handle) {
int64_t h = (int64_t)handle; int64_t h = (int64_t)handle;
taosThreadMutexLock(&proc->cqueue->mutex); taosThreadMutexLock(&proc->cqueue->mutex);
int64_t *pRef = taosHashGet(proc->hash, &h, sizeof(int64_t));
int64_t ref = 0;
if (pRef != NULL) {
ref = *pRef;
}
taosHashRemove(proc->hash, &h, sizeof(int64_t)); taosHashRemove(proc->hash, &h, sizeof(int64_t));
taosThreadMutexUnlock(&proc->cqueue->mutex); taosThreadMutexUnlock(&proc->cqueue->mutex);
return ref;
} }
void dmCloseProcRpcHandles(SProc *proc) { void dmCloseProcRpcHandles(SProc *proc) {
taosThreadMutexLock(&proc->cqueue->mutex); taosThreadMutexLock(&proc->cqueue->mutex);
void *h = taosHashIterate(proc->hash, NULL); SRpcHandleInfo *pInfo = taosHashIterate(proc->hash, NULL);
while (h != NULL) { while (pInfo != NULL) {
void *handle = *((void **)h); dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, pInfo->handle);
h = taosHashIterate(proc->hash, h); SRpcMsg rpcMsg = {.info = *pInfo, .code = TSDB_CODE_NODE_OFFLINE};
dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, handle);
SRpcMsg rpcMsg = {.info.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
pInfo = taosHashIterate(proc->hash, pInfo);
} }
taosHashClear(proc->hash); taosHashClear(proc->hash);
taosThreadMutexUnlock(&proc->cqueue->mutex); taosThreadMutexUnlock(&proc->cqueue->mutex);
} }
void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
EProcFuncType ftype) {
int32_t retry = 0; int32_t retry = 0;
while (dmPushToProcQueue(proc, proc->pqueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) { while (1) {
dWarn("node:%s, failed to put msg:%p to pqueue since %s, retry:%d", proc->name, pHead, terrstr(), retry); if (dmPushToProcQueue(proc, proc->pqueue, pMsg, ftype) == 0) {
retry++; break;
taosMsleep(retry); }
if (retry == 10) {
pMsg->code = terrno;
if (pMsg->contLen > 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
pMsg->contLen = 0;
}
dError("node:%s, failed to push %s msg:%p type:%d handle:%p then discard data and return error", proc->name,
dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle);
} else {
dError("node:%s, failed to push %s msg:%p type:%d handle:%p len:%d since %s, retry:%d", proc->name,
dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, terrstr(), retry);
retry++;
taosMsleep(retry);
}
} }
} }
int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
void *handle, int64_t ref, EProcFuncType ftype) { return dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype);
return dmPushToProcQueue(proc, proc->cqueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ref, ftype);
} }
...@@ -128,9 +128,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -128,9 +128,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
} }
if (InParentProc(pWrapper)) { if (InParentProc(pWrapper)) {
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen, code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
(IsReq(pRpc) && (pRpc->code == 0)) ? pRpc->info.handle : NULL, pRpc->info.refId,
DND_FUNC_REQ);
} else { } else {
code = dmProcessNodeMsg(pWrapper, pMsg); code = dmProcessNodeMsg(pWrapper, pMsg);
} }
...@@ -255,23 +253,23 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) { ...@@ -255,23 +253,23 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) {
} }
} }
static inline void dmSendRsp(const SRpcMsg *pMsg) { static inline void dmSendRsp(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper; SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) { if (pMsg->code == TSDB_CODE_NODE_REDIRECT) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_RSP); dmSendRpcRedirectRsp(pMsg);
} else { } else {
if (pMsg->code == TSDB_CODE_NODE_REDIRECT) { if (InChildProc(pWrapper)) {
dmSendRpcRedirectRsp(pMsg); dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
} else { } else {
rpcSendResponse(pMsg); rpcSendResponse(pMsg);
} }
} }
} }
static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSet) { static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
SMgmtWrapper *pWrapper = pRsp->info.wrapper; SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) { if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP); dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
} else { } else {
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
SMEpSet msg = {.epSet = *pNewEpSet}; SMEpSet msg = {.epSet = *pNewEpSet};
...@@ -281,7 +279,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe ...@@ -281,7 +279,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe
tSerializeSMEpSet(rsp.pCont, len, &msg); tSerializeSMEpSet(rsp.pCont, len, &msg);
rsp.code = TSDB_CODE_RPC_REDIRECT; rsp.code = TSDB_CODE_RPC_REDIRECT;
rsp.info = pRsp->info; rsp.info = pMsg->info;
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
} }
...@@ -289,7 +287,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe ...@@ -289,7 +287,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper; SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) { if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST); dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
} else { } else {
rpcRegisterBrokenLinkArg(pMsg); rpcRegisterBrokenLinkArg(pMsg);
} }
...@@ -299,7 +297,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { ...@@ -299,7 +297,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
SMgmtWrapper *pWrapper = pHandle->wrapper; SMgmtWrapper *pWrapper = pHandle->wrapper;
if (InChildProc(pWrapper)) { if (InChildProc(pWrapper)) {
SRpcMsg msg = {.code = type, .info = *pHandle}; SRpcMsg msg = {.code = type, .info = *pHandle};
dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE); dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
} else { } else {
rpcReleaseHandle(pHandle->handle, type); rpcReleaseHandle(pHandle->handle, type);
} }
......
...@@ -122,7 +122,7 @@ static void mndCleanupTimer(SMnode *pMnode) { ...@@ -122,7 +122,7 @@ static void mndCleanupTimer(SMnode *pMnode) {
pMnode->stopped = true; pMnode->stopped = true;
if (taosCheckPthreadValid(pMnode->thread)) { if (taosCheckPthreadValid(pMnode->thread)) {
taosThreadJoin(pMnode->thread, NULL); taosThreadJoin(pMnode->thread, NULL);
memset(&pMnode->thread, 0, sizeof(pMnode->thread)); taosThreadClear(&pMnode->thread);
} }
} }
......
...@@ -16,10 +16,9 @@ ...@@ -16,10 +16,9 @@
#include "tcache.h" #include "tcache.h"
void reportStartup(const char *name, const char *desc) {} void reportStartup(const char *name, const char *desc) {}
void sendRsp(const SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } void sendRsp(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); }
int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
// rpcFreeCont(pMsg->pCont);
terrno = TSDB_CODE_INVALID_PTR; terrno = TSDB_CODE_INVALID_PTR;
return -1; return -1;
} }
......
...@@ -213,8 +213,9 @@ int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valL ...@@ -213,8 +213,9 @@ int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valL
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
return -1; return -1;
} }
if (pVal != NULL) {
memcpy(pVal, pRaw->pData + dataPos, valLen); memcpy(pVal, pRaw->pData + dataPos, valLen);
}
return 0; return 0;
} }
...@@ -235,4 +236,4 @@ int32_t sdbGetRawTotalSize(SSdbRaw *pRaw) { ...@@ -235,4 +236,4 @@ int32_t sdbGetRawTotalSize(SSdbRaw *pRaw) {
} }
return sizeof(SSdbRaw) + pRaw->dataLen; return sizeof(SSdbRaw) + pRaw->dataLen;
} }
\ No newline at end of file
...@@ -2497,7 +2497,7 @@ bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo ...@@ -2497,7 +2497,7 @@ bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo
pInfo->min = MAX_TS_KEY; pInfo->min = MAX_TS_KEY;
pInfo->max = 0; pInfo->max = 0;
if (pCtx->numOfParams == 3) { if (pCtx->numOfParams == 2) {
pInfo->timeUnit = pCtx->param[1].param.i; pInfo->timeUnit = pCtx->param[1].param.i;
} else { } else {
pInfo->timeUnit = 1; pInfo->timeUnit = 1;
...@@ -2521,7 +2521,6 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) { ...@@ -2521,7 +2521,6 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
} }
if (pInput->colDataAggIsSet) { if (pInput->colDataAggIsSet) {
if (pInfo->min == MAX_TS_KEY) { if (pInfo->min == MAX_TS_KEY) {
pInfo->min = GET_INT64_VAL(&pAgg->min); pInfo->min = GET_INT64_VAL(&pAgg->min);
pInfo->max = GET_INT64_VAL(&pAgg->max); pInfo->max = GET_INT64_VAL(&pAgg->max);
......
...@@ -361,7 +361,7 @@ int32_t indexConvertDataToStr(void* src, int8_t type, void** dst) { ...@@ -361,7 +361,7 @@ int32_t indexConvertDataToStr(void* src, int8_t type, void** dst) {
tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src)); tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src));
*dst = taosMemoryCalloc(1, tlen + 1); *dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src)); tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src));
*dst = *dst - tlen; *dst = (char*) * dst - tlen;
break; break;
} }
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
......
...@@ -473,17 +473,16 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR ...@@ -473,17 +473,16 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
int32_t sz = 0; int32_t sz = 0;
char* ch = (char*)fstSliceData(s, &sz); char* ch = (char*)fstSliceData(s, &sz);
// char* tmp = taosMemoryCalloc(1, sz + 1); char* tmp = taosMemoryCalloc(1, sz + 1);
// memcpy(tmp, ch, sz); memcpy(tmp, ch, sz);
if (0 != strncmp(ch, p, skip)) { if (0 != strncmp(tmp, p, skip)) {
swsResultDestroy(rt); swsResultDestroy(rt);
// taosMemoryFree(tmp); taosMemoryFree(tmp);
break; break;
} }
TExeCond cond = cmpFn(ch + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType)); TExeCond cond = cmpFn(tmp + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType));
if (MATCH == cond) { if (MATCH == cond) {
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total); tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
} else if (CONTINUE == cond) { } else if (CONTINUE == cond) {
...@@ -491,7 +490,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR ...@@ -491,7 +490,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
swsResultDestroy(rt); swsResultDestroy(rt);
break; break;
} }
// taosMemoryFree(tmp); taosMemoryFree(tmp);
swsResultDestroy(rt); swsResultDestroy(rt);
} }
streamWithStateDestroy(st); streamWithStateDestroy(st);
......
...@@ -578,3 +578,40 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_FLOAT) { ...@@ -578,3 +578,40 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_FLOAT) {
EXPECT_EQ(1000, taosArrayGetSize(res)); EXPECT_EQ(1000, taosArrayGetSize(res));
} }
} }
TEST_F(JsonEnv, testWriteJsonTfileAndCache_DOUBLE) {
{
double val = 10.0;
std::string colName("test1");
for (int i = 0; i < 1000; i++) {
WriteData(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), i);
}
}
{
double val = 2.0;
std::string colName("test1");
for (int i = 0; i < 1000; i++) {
WriteData(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), i + 1000);
}
}
{
SArray* res = NULL;
std::string colName("test1");
double val = 1.9;
Search(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(2000, taosArrayGetSize(res));
}
{
SArray* res = NULL;
std::string colName("test1");
double val = 2.1;
Search(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(1000, taosArrayGetSize(res));
}
{
std::string colName("test1");
SArray* res = NULL;
double val = 2.1;
Search(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(1000, taosArrayGetSize(res));
}
}
...@@ -237,6 +237,7 @@ static int32_t syncIOStopInternal(SSyncIO *io) { ...@@ -237,6 +237,7 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
int32_t ret = 0; int32_t ret = 0;
atomic_store_8(&io->isStart, 0); atomic_store_8(&io->isStart, 0);
taosThreadJoin(io->consumerTid, NULL); taosThreadJoin(io->consumerTid, NULL);
taosThreadClear(&io->consumerTid);
taosTmrCleanUp(io->timerMgr); taosTmrCleanUp(io->timerMgr);
return ret; return ret;
} }
......
...@@ -145,9 +145,9 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); ...@@ -145,9 +145,9 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
} while (0) } while (0)
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \ #define CONN_SHOULD_RELEASE(conn, head) \
do { \ do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
...@@ -227,7 +227,7 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); ...@@ -227,7 +227,7 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1)
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) #define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn) #define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port) #define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
static void* cliWorkThread(void* arg); static void* cliWorkThread(void* arg);
...@@ -924,8 +924,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -924,8 +924,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
arg->param1 = pMsg; arg->param1 = pMsg;
arg->param2 = pThrd; arg->param2 = pThrd;
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
cliDestroyConn(pConn, true);
cliDestroy((uv_handle_t*)pConn->stream);
return -1; return -1;
} }
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) { } else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
......
...@@ -244,6 +244,7 @@ static void walStopThread() { ...@@ -244,6 +244,7 @@ static void walStopThread() {
if (taosCheckPthreadValid(tsWal.thread)) { if (taosCheckPthreadValid(tsWal.thread)) {
taosThreadJoin(tsWal.thread, NULL); taosThreadJoin(tsWal.thread, NULL);
taosThreadClear(&tsWal.thread);
} }
wDebug("wal thread is stopped"); wDebug("wal thread is stopped");
......
...@@ -145,6 +145,7 @@ void taosCloseLog() { ...@@ -145,6 +145,7 @@ void taosCloseLog() {
taosStopLog(); taosStopLog();
if (tsLogObj.logHandle != NULL && taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) { if (tsLogObj.logHandle != NULL && taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) {
taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL); taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL);
taosThreadClear(&tsLogObj.logHandle->asyncThread);
} }
tsLogInited = 0; tsLogInited = 0;
......
...@@ -209,6 +209,7 @@ void taosCleanUpScheduler(void *param) { ...@@ -209,6 +209,7 @@ void taosCleanUpScheduler(void *param) {
for (int32_t i = 0; i < pSched->numOfThreads; ++i) { for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
if (taosCheckPthreadValid(pSched->qthread[i])) { if (taosCheckPthreadValid(pSched->qthread[i])) {
taosThreadJoin(pSched->qthread[i], NULL); taosThreadJoin(pSched->qthread[i], NULL);
taosThreadClear(&pSched->qthread[i]);
} }
} }
......
...@@ -57,6 +57,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) { ...@@ -57,6 +57,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
if (worker == NULL) continue; if (worker == NULL) continue;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
taosThreadJoin(worker->thread, NULL); taosThreadJoin(worker->thread, NULL);
taosThreadClear(&worker->thread);
} }
} }
...@@ -179,6 +180,7 @@ void tWWorkerCleanup(SWWorkerPool *pool) { ...@@ -179,6 +180,7 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
SWWorker *worker = pool->workers + i; SWWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
taosThreadJoin(worker->thread, NULL); taosThreadJoin(worker->thread, NULL);
taosThreadClear(&worker->thread);
taosFreeQall(worker->qall); taosFreeQall(worker->qall);
taosCloseQset(worker->qset); taosCloseQset(worker->qset);
} }
......
...@@ -436,6 +436,7 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -436,6 +436,7 @@ int32_t main(int32_t argc, char *argv[]) {
taosMsleep(300); taosMsleep(300);
for (int32_t i = 0; i < numOfThreads; i++) { for (int32_t i = 0; i < numOfThreads; i++) {
taosThreadJoin(pInfo[i].thread, NULL); taosThreadJoin(pInfo[i].thread, NULL);
taosThreadClear(&pInfo[i].thread);
} }
int64_t maxDelay = 0; int64_t maxDelay = 0;
......
...@@ -537,6 +537,7 @@ int main(int32_t argc, char* argv[]) { ...@@ -537,6 +537,7 @@ int main(int32_t argc, char* argv[]) {
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
taosThreadClear(&g_stConfInfo.stThreads[i].thread);
} }
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
......
...@@ -56,6 +56,7 @@ void simFreeScript(SScript *script) { ...@@ -56,6 +56,7 @@ void simFreeScript(SScript *script) {
bgScript->killed = true; bgScript->killed = true;
if (taosCheckPthreadValid(bgScript->bgPid)) { if (taosCheckPthreadValid(bgScript->bgPid)) {
taosThreadJoin(bgScript->bgPid, NULL); taosThreadJoin(bgScript->bgPid, NULL);
taosThreadClear(&bgScript->bgPid);
} }
simDebug("script:%s, background thread joined", bgScript->fileName); simDebug("script:%s, background thread joined", bgScript->fileName);
......
...@@ -985,6 +985,7 @@ int32_t shellExecute() { ...@@ -985,6 +985,7 @@ int32_t shellExecute() {
while (1) { while (1) {
taosThreadCreate(&shell.pid, NULL, shellThreadLoop, shell.conn); taosThreadCreate(&shell.pid, NULL, shellThreadLoop, shell.conn);
taosThreadJoin(shell.pid, NULL); taosThreadJoin(shell.pid, NULL);
taosThreadClear(&shell.pid);
} }
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册