提交 c018a14e 编写于 作者: L Liu Jicong

Merge branch '3.0' into feature/stream

...@@ -54,6 +54,7 @@ typedef struct { ...@@ -54,6 +54,7 @@ typedef struct {
SHashObj* map; // speedup acquire the tableQueryInfo by table uid SHashObj* map; // speedup acquire the tableQueryInfo by table uid
void* pTagCond; void* pTagCond;
void* pTagIndexCond; void* pTagIndexCond;
uint64_t suid;
} STableListInfo; } STableListInfo;
typedef struct SColumnDataAgg { typedef struct SColumnDataAgg {
......
...@@ -33,7 +33,7 @@ struct SDataSink; ...@@ -33,7 +33,7 @@ struct SDataSink;
struct SSDataBlock; struct SSDataBlock;
typedef struct SDeleterRes { typedef struct SDeleterRes {
uint64_t uid; uint64_t suid;
SArray* uidList; SArray* uidList;
int64_t skey; int64_t skey;
int64_t ekey; int64_t ekey;
...@@ -41,7 +41,8 @@ typedef struct SDeleterRes { ...@@ -41,7 +41,8 @@ typedef struct SDeleterRes {
} SDeleterRes; } SDeleterRes;
typedef struct SDeleterParam { typedef struct SDeleterParam {
SArray* pUidList; uint64_t suid;
SArray* pUidList;
} SDeleterParam; } SDeleterParam;
typedef struct SDataSinkStat { typedef struct SDataSinkStat {
......
...@@ -32,7 +32,7 @@ enum { ...@@ -32,7 +32,7 @@ enum {
}; };
typedef struct SDeleteRes { typedef struct SDeleteRes {
uint64_t uid; uint64_t suid;
SArray* uidList; SArray* uidList;
int64_t skey; int64_t skey;
int64_t ekey; int64_t ekey;
......
...@@ -71,6 +71,7 @@ int32_t* taosGetErrno(); ...@@ -71,6 +71,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029) #define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029)
#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030) #define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030)
#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031) #define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031)
#define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0032)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
......
...@@ -76,22 +76,28 @@ void deltaToUtcInitOnce() { ...@@ -76,22 +76,28 @@ void deltaToUtcInitOnce() {
static int64_t parseFraction(char* str, char** end, int32_t timePrec); static int64_t parseFraction(char* str, char** end, int32_t timePrec);
static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim); static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim);
static int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_t timePrec); static int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim);
static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec); static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim);
static char* forwardToTimeStringEnd(char* str); static char* forwardToTimeStringEnd(char* str);
static bool checkTzPresent(const char* str, int32_t len); static bool checkTzPresent(const char* str, int32_t len);
static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec) = {parseLocaltime, static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim) = {parseLocaltime,
parseLocaltimeDst}; parseLocaltimeDst};
int32_t taosParseTime(const char* timestr, int64_t* utime, int32_t len, int32_t timePrec, int8_t day_light) { int32_t taosParseTime(const char* timestr, int64_t* utime, int32_t len, int32_t timePrec, int8_t day_light) {
/* parse datatime string in with tz */ /* parse datatime string in with tz */
if (strnchr(timestr, 'T', len, false) != NULL) { if (strnchr(timestr, 'T', len, false) != NULL) {
return parseTimeWithTz(timestr, utime, timePrec, 'T'); if (checkTzPresent(timestr, len)) {
} else if (checkTzPresent(timestr, len)) { return parseTimeWithTz(timestr, utime, timePrec, 'T');
return parseTimeWithTz(timestr, utime, timePrec, 0); } else {
return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec, 'T');
}
} else { } else {
return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec); if (checkTzPresent(timestr, len)) {
return parseTimeWithTz(timestr, utime, timePrec, 0);
} else {
return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec, 0);
}
} }
} }
...@@ -333,13 +339,25 @@ static FORCE_INLINE bool validateTm(struct tm* pTm) { ...@@ -333,13 +339,25 @@ static FORCE_INLINE bool validateTm(struct tm* pTm) {
return true; return true;
} }
int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec) { int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec, char delim) {
*time = 0; *time = 0;
struct tm tm = {0}; struct tm tm = {0};
char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); char *str;
if (delim == 'T') {
str = taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm);
} else if (delim == 0) {
str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm);
} else {
str = NULL;
}
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
return -1; //if parse failed, try "%Y-%m-%d" format
str = taosStrpTime(timestr, "%Y-%m-%d", &tm);
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
return -1;
}
} }
#ifdef _MSC_VER #ifdef _MSC_VER
...@@ -367,14 +385,26 @@ int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePr ...@@ -367,14 +385,26 @@ int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePr
return 0; return 0;
} }
int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t timePrec) { int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t timePrec, char delim) {
*time = 0; *time = 0;
struct tm tm = {0}; struct tm tm = {0};
tm.tm_isdst = -1; tm.tm_isdst = -1;
char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); char *str;
if (delim == 'T') {
str = taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm);
} else if (delim == 0) {
str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm);
} else {
str = NULL;
}
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
return -1; //if parse failed, try "%Y-%m-%d" format
str = taosStrpTime(timestr, "%Y-%m-%d", &tm);
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
return -1;
}
} }
/* mktime will be affected by TZ, set by using taos_options */ /* mktime will be affected by TZ, set by using taos_options */
......
...@@ -648,30 +648,30 @@ _OVER: ...@@ -648,30 +648,30 @@ _OVER:
} }
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1; int32_t code = -1;
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
SMnodeObj *pMObj = NULL; SMnodeObj *pMObj = NULL;
SQnodeObj *pQObj = NULL; SQnodeObj *pQObj = NULL;
SSnodeObj *pSObj = NULL; SSnodeObj *pSObj = NULL;
SMDropMnodeReq dropReq = {0}; SDropDnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto _OVER; goto _OVER;
} }
mInfo("dnode:%d, start to drop", dropReq.dnodeId); mInfo("dnode:%d, start to drop, ep:%s:%d", dropReq.dnodeId, dropReq.fqdn, dropReq.port);
if (dropReq.dnodeId <= 0) {
terrno = TSDB_CODE_MND_INVALID_DNODE_ID;
goto _OVER;
}
pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId); pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; char ep[TSDB_EP_LEN + 1] = {0};
goto _OVER; snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
pDnode = mndAcquireDnodeByEp(pMnode, ep);
if (pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
goto _OVER;
}
} }
pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId); pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
...@@ -726,6 +726,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { ...@@ -726,6 +726,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
return -1; return -1;
} }
mInfo("dnode:%d, start to config, option:%s, value:%s", cfgReq.dnodeId, cfgReq.config, cfgReq.value);
SDnodeObj *pDnode = mndAcquireDnode(pMnode, cfgReq.dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, cfgReq.dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
mError("dnode:%d, failed to config since %s ", cfgReq.dnodeId, terrstr()); mError("dnode:%d, failed to config since %s ", cfgReq.dnodeId, terrstr());
...@@ -742,7 +743,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { ...@@ -742,7 +743,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
tSerializeSMCfgDnodeReq(pBuf, bufLen, &cfgReq); tSerializeSMCfgDnodeReq(pBuf, bufLen, &cfgReq);
mDebug("dnode:%d, send config req to dnode, app:%p", cfgReq.dnodeId, pReq->info.ahandle); mDebug("dnode:%d, send config req to dnode, app:%p", cfgReq.dnodeId, pReq->info.ahandle);
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info}; SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
return tmsgSendReq(&epSet, &rpcMsg); return tmsgSendReq(&epSet, &rpcMsg);
} }
......
...@@ -29,6 +29,7 @@ target_sources( ...@@ -29,6 +29,7 @@ target_sources(
# sma # sma
"src/sma/sma.c" "src/sma/sma.c"
"src/sma/smaEnv.c" "src/sma/smaEnv.c"
"src/sma/smaUtil.c"
"src/sma/smaOpen.c" "src/sma/smaOpen.c"
"src/sma/smaRollup.c" "src/sma/smaRollup.c"
"src/sma/smaTimeRange.c" "src/sma/smaTimeRange.c"
......
...@@ -34,7 +34,8 @@ extern "C" { ...@@ -34,7 +34,8 @@ extern "C" {
typedef struct SSmaEnv SSmaEnv; typedef struct SSmaEnv SSmaEnv;
typedef struct SSmaStat SSmaStat; typedef struct SSmaStat SSmaStat;
typedef struct SSmaStatItem SSmaStatItem; typedef struct STSmaStat STSmaStat;
typedef struct SRSmaStat SRSmaStat;
typedef struct SSmaKey SSmaKey; typedef struct SSmaKey SSmaKey;
typedef struct SRSmaInfo SRSmaInfo; typedef struct SRSmaInfo SRSmaInfo;
typedef struct SRSmaInfoItem SRSmaInfoItem; typedef struct SRSmaInfoItem SRSmaInfoItem;
...@@ -45,26 +46,38 @@ struct SSmaEnv { ...@@ -45,26 +46,38 @@ struct SSmaEnv {
SSmaStat *pStat; SSmaStat *pStat;
}; };
#define SMA_ENV_LOCK(env) ((env)->lock) #define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_TYPE(env) ((env)->type) #define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_STAT(env) ((env)->pStat) #define SMA_ENV_STAT(env) ((env)->pStat)
#define SMA_ENV_STAT_ITEM(env) ((env)->pStat->tsmaStatItem)
struct SSmaStatItem { struct STSmaStat {
int8_t state; // ETsdbSmaStat int8_t state; // ETsdbSmaStat
STSma *pTSma; // cache schema STSma *pTSma; // cache schema
STSchema *pTSchema; STSchema *pTSchema;
}; };
struct SRSmaStat {
SSma *pSma;
void *tmrHandle;
tmr_h tmrId;
int8_t tmrStat;
int32_t tmrSeconds;
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
};
struct SSmaStat { struct SSmaStat {
union { union {
SSmaStatItem tsmaStatItem; STSmaStat tsmaStat; // time-range-wise sma
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; SRSmaStat rsmaStat; // rollup sma
}; };
T_REF_DECLARE() T_REF_DECLARE()
}; };
#define SMA_STAT_ITEM(s) ((s)->tsmaStatItem) #define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash) #define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
#define SMA_RSMA_INFO_HASH(s) ((s)->rsmaStat.rsmaInfoHash)
#define SMA_RSMA_TMR_HANDLE(s) ((s)->rsmaStat.tmrHandle)
#define SMA_RSMA_TMR_STAT(s) ((s)->rsmaStat.tmrStat)
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
...@@ -107,53 +120,51 @@ static FORCE_INLINE int32_t tdUnLockSmaEnv(SSmaEnv *pEnv) { ...@@ -107,53 +120,51 @@ static FORCE_INLINE int32_t tdUnLockSmaEnv(SSmaEnv *pEnv) {
return 0; return 0;
} }
static FORCE_INLINE int8_t tdSmaStat(SSmaStatItem *pStatItem) { static FORCE_INLINE int8_t tdSmaStat(STSmaStat *pTStat) {
if (pStatItem) { if (pTStat) {
return atomic_load_8(&pStatItem->state); return atomic_load_8(&pTStat->state);
} }
return TSDB_SMA_STAT_UNKNOWN; return TSDB_SMA_STAT_UNKNOWN;
} }
static FORCE_INLINE bool tdSmaStatIsOK(SSmaStatItem *pStatItem, int8_t *state) { static FORCE_INLINE bool tdSmaStatIsOK(STSmaStat *pTStat, int8_t *state) {
if (!pStatItem) { if (!pTStat) {
return false; return false;
} }
if (state) { if (state) {
*state = atomic_load_8(&pStatItem->state); *state = atomic_load_8(&pTStat->state);
return *state == TSDB_SMA_STAT_OK; return *state == TSDB_SMA_STAT_OK;
} }
return atomic_load_8(&pStatItem->state) == TSDB_SMA_STAT_OK; return atomic_load_8(&pTStat->state) == TSDB_SMA_STAT_OK;
} }
static FORCE_INLINE bool tdSmaStatIsExpired(SSmaStatItem *pStatItem) { static FORCE_INLINE bool tdSmaStatIsExpired(STSmaStat *pTStat) {
return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_EXPIRED) : true; return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_EXPIRED) : true;
} }
static FORCE_INLINE bool tdSmaStatIsDropped(SSmaStatItem *pStatItem) { static FORCE_INLINE bool tdSmaStatIsDropped(STSmaStat *pTStat) {
return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_DROPPED) : true; return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_DROPPED) : true;
} }
static FORCE_INLINE void tdSmaStatSetOK(SSmaStatItem *pStatItem) { static FORCE_INLINE void tdSmaStatSetOK(STSmaStat *pTStat) {
if (pStatItem) { if (pTStat) {
atomic_store_8(&pStatItem->state, TSDB_SMA_STAT_OK); atomic_store_8(&pTStat->state, TSDB_SMA_STAT_OK);
} }
} }
static FORCE_INLINE void tdSmaStatSetExpired(SSmaStatItem *pStatItem) { static FORCE_INLINE void tdSmaStatSetExpired(STSmaStat *pTStat) {
if (pStatItem) { if (pTStat) {
atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_EXPIRED); atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_EXPIRED);
} }
} }
static FORCE_INLINE void tdSmaStatSetDropped(SSmaStatItem *pStatItem) { static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
if (pStatItem) { if (pTStat) {
atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_DROPPED); atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_DROPPED);
} }
} }
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType);
void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem);
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
...@@ -163,6 +174,51 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg); ...@@ -163,6 +174,51 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg); int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
typedef struct STFInfo STFInfo;
typedef struct STFile STFile;
struct STFInfo {
uint32_t magic;
uint32_t ftype;
uint32_t fver;
uint64_t fsize;
};
struct STFile {
STFInfo info;
STfsFile f;
TdFilePtr pFile;
uint8_t state;
};
#define TD_FILE_F(tf) (&((tf)->f))
#define TD_FILE_PFILE(tf) ((tf)->pFile)
#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL)
#define TD_FILE_FULL_NAME(tf) (TD_FILE_F(tf)->aname)
#define TD_FILE_REL_NAME(tf) (TD_FILE_F(tf)->rname)
#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL)
#define TD_FILE_CLOSED(tf) (!TD_FILE_OPENED(tf))
#define TD_FILE_SET_CLOSED(f) (TD_FILE_PFILE(f) = NULL)
#define TD_FILE_STATE(tf) ((tf)->state)
#define TD_FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TD_FILE_DID(tf) (TD_FILE_F(tf)->did)
#define TD_FILE_IS_OK(tf) (TD_FILE_STATE(tf) == TD_FILE_STATE_OK)
#define TD_FILE_IS_BAD(tf) (TD_FILE_STATE(tf) == TD_FILE_STATE_BAD)
int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname);
int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType);
int32_t tdOpenTFile(STFile *pTFile, int flags);
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte);
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence);
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte);
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset);
int32_t tdRemoveTFile(STFile *pTFile);
int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo);
int32_t tdUpdateTFileHeader(STFile *pTFile);
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
void tdCloseTFile(STFile *pTFile);
void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -247,7 +247,6 @@ struct SVnode { ...@@ -247,7 +247,6 @@ struct SVnode {
struct STbUidStore { struct STbUidStore {
tb_uid_t suid; tb_uid_t suid;
tb_uid_t uid; // TODO: just for debugging, remove when uid provided in SSDataBlock
SArray* tbUids; SArray* tbUids;
SHashObj* uidHash; SHashObj* uidHash;
}; };
......
...@@ -219,11 +219,9 @@ _err: ...@@ -219,11 +219,9 @@ _err:
} }
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){
metaRLock(pMeta);
TBC * pCur; TBC * pCur;
int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL); int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
if (ret < 0) { if (ret < 0) {
metaULock(pMeta);
return ret; return ret;
} }
...@@ -249,6 +247,7 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){ ...@@ -249,6 +247,7 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){
tdbTbcClose(pCur); tdbTbcClose(pCur);
tdbFree(pKey); tdbFree(pKey);
return 0; return 0;
} }
...@@ -265,8 +264,8 @@ struct SMCtbCursor { ...@@ -265,8 +264,8 @@ struct SMCtbCursor {
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
SMCtbCursor *pCtbCur = NULL; SMCtbCursor *pCtbCur = NULL;
SCtbIdxKey ctbIdxKey; SCtbIdxKey ctbIdxKey;
int ret; int ret = 0;
int c; int c = 0;
pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur)); pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
if (pCtbCur == NULL) { if (pCtbCur == NULL) {
......
...@@ -375,6 +375,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) { ...@@ -375,6 +375,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) {
metaWLock(pMeta); metaWLock(pMeta);
int ret = metaTtlSmaller(pMeta, ttl, tbUids); int ret = metaTtlSmaller(pMeta, ttl, tbUids);
if(ret != 0){ if(ret != 0){
metaULock(pMeta);
return ret; return ret;
} }
for (int i = 0; i < taosArrayGetSize(tbUids); ++i) { for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
......
...@@ -21,9 +21,10 @@ typedef struct SSmaStat SSmaStat; ...@@ -21,9 +21,10 @@ typedef struct SSmaStat SSmaStat;
// declaration of static functions // declaration of static functions
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType); static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma);
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path); static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path);
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv); static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv);
static void *tdFreeTSmaStat(STSmaStat *pStat);
// implementation // implementation
...@@ -45,7 +46,7 @@ static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) ...@@ -45,7 +46,7 @@ static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path)
return NULL; return NULL;
} }
if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType) != TSDB_CODE_SUCCESS) { if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) {
tdFreeSmaEnv(pEnv); tdFreeSmaEnv(pEnv);
return NULL; return NULL;
} }
...@@ -105,7 +106,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { ...@@ -105,7 +106,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
return 0; return 0;
} }
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
ASSERT(pSmaStat != NULL); ASSERT(pSmaStat != NULL);
if (*pSmaStat) { // no lock if (*pSmaStat) { // no lock
...@@ -125,10 +126,23 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { ...@@ -125,10 +126,23 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
} }
if (smaType == TSDB_SMA_TYPE_ROLLUP) { if (smaType == TSDB_SMA_TYPE_ROLLUP) {
SMA_STAT_INFO_HASH(*pSmaStat) = taosHashInit( SMA_RSMA_STAT(*pSmaStat)->pSma = (SSma*)pSma;
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); // init timer
SMA_RSMA_TMR_HANDLE(*pSmaStat) = taosTmrInit(10000, 100, 10000, "RSMA_G");
if (!SMA_RSMA_TMR_HANDLE(*pSmaStat)) {
taosMemoryFreeClear(*pSmaStat);
return TSDB_CODE_FAILED;
}
atomic_store_8(&SMA_RSMA_TMR_STAT(*pSmaStat), TASK_TRIGGER_STATUS__ACTIVE);
if (!SMA_STAT_INFO_HASH(*pSmaStat)) { // init hash
SMA_RSMA_INFO_HASH(*pSmaStat) = taosHashInit(
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!SMA_RSMA_INFO_HASH(*pSmaStat)) {
if (SMA_RSMA_TMR_HANDLE(*pSmaStat)) {
taosTmrCleanUp(SMA_RSMA_TMR_HANDLE(*pSmaStat));
}
taosMemoryFreeClear(*pSmaStat); taosMemoryFreeClear(*pSmaStat);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -141,16 +155,16 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { ...@@ -141,16 +155,16 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) { static void *tdFreeTSmaStat(STSmaStat *pStat) {
if (pSmaStatItem) { if (pStat) {
tDestroyTSma(pSmaStatItem->pTSma); tDestroyTSma(pStat->pTSma);
taosMemoryFreeClear(pSmaStatItem->pTSma); taosMemoryFreeClear(pStat->pTSma);
taosMemoryFreeClear(pSmaStatItem); taosMemoryFreeClear(pStat);
} }
return NULL; return NULL;
} }
void* tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
tdDestroySmaState(pSmaStat, smaType); tdDestroySmaState(pSmaStat, smaType);
taosMemoryFreeClear(pSmaStat); taosMemoryFreeClear(pSmaStat);
return NULL; return NULL;
...@@ -165,16 +179,19 @@ void* tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { ...@@ -165,16 +179,19 @@ void* tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
if (pSmaStat) { if (pSmaStat) {
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
tdFreeSmaStatItem(&pSmaStat->tsmaStatItem); tdFreeTSmaStat(&pSmaStat->tsmaStat);
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) { } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
if (SMA_RSMA_TMR_HANDLE(pSmaStat)) {
taosTmrCleanUp(SMA_RSMA_TMR_HANDLE(pSmaStat));
}
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
void *infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), NULL); void *infoHash = taosHashIterate(SMA_RSMA_INFO_HASH(pSmaStat), NULL);
while (infoHash) { while (infoHash) {
SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash; SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash;
tdFreeRSmaInfo(pInfoHash); tdFreeRSmaInfo(pInfoHash);
infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), infoHash); infoHash = taosHashIterate(SMA_RSMA_INFO_HASH(pSmaStat), infoHash);
} }
taosHashCleanup(SMA_STAT_INFO_HASH(pSmaStat)); taosHashCleanup(SMA_RSMA_INFO_HASH(pSmaStat));
} else { } else {
ASSERT(0); ASSERT(0);
} }
...@@ -273,4 +290,4 @@ void smaTimerCleanUp(void *timer, int8_t *initFlag) { ...@@ -273,4 +290,4 @@ void smaTimerCleanUp(void *timer, int8_t *initFlag) {
taosTmrCleanUp(timer); taosTmrCleanUp(timer);
atomic_store_8(initFlag, 0); atomic_store_8(initFlag, 0);
} }
} }
\ No newline at end of file
...@@ -137,4 +137,17 @@ int32_t smaClose(SSma *pSma) { ...@@ -137,4 +137,17 @@ int32_t smaClose(SSma *pSma) {
taosMemoryFreeClear(pSma); taosMemoryFreeClear(pSma);
} }
return 0; return 0;
}
/**
* @brief rsma env restore
*
* @param pSma
* @return int32_t
*/
int32_t smaRestore(SSma *pSma) {
if (!pSma) return 0;
// iterate all stables to restore the rsma env
return TSDB_CODE_SUCCESS;
} }
\ No newline at end of file
...@@ -16,35 +16,17 @@ ...@@ -16,35 +16,17 @@
#include "sma.h" #include "sma.h"
#include "tstream.h" #include "tstream.h"
static FORCE_INLINE int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); typedef enum { TD_QTASK_TMP_FILE = 0, TD_QTASK_CUR_FILE } TD_QTASK_FILE_T;
static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"};
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem,
tb_uid_t suid, int8_t level); static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
#define SET_RSMA_INFO_ITEM_PARAMS(__idx, __level) \ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo *pRSmaInfo, SReadHandle *handle,
if (param->qmsg[__idx]) { \ int8_t idx);
pRSmaInfo->items[__idx].pRsmaInfo = pRSmaInfo; \ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem,
pRSmaInfo->items[__idx].taskInfo = qCreateStreamExecTaskInfo(param->qmsg[0], &handle); \ tb_uid_t suid, int8_t level);
if (!pRSmaInfo->items[__idx].taskInfo) { \ static void tdRSmaFetchTrigger(void *param, void *tmrId);
goto _err; \ static void tdRSmaPersistTrigger(void *param, void *tmrId);
} \
pRSmaInfo->items[__idx].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; \
if (param->maxdelay[__idx] < 1) { \
int64_t msInterval = \
convertTimeFromPrecisionToUnit(pRetention[__level].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); \
pRSmaInfo->items[__idx].maxDelay = msInterval; \
} else { \
pRSmaInfo->items[__idx].maxDelay = param->maxdelay[__idx]; \
} \
if (pRSmaInfo->items[__idx].maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) { \
pRSmaInfo->items[__idx].maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; \
} \
pRSmaInfo->items[__idx].level = TSDB_RETENTION_L##__level; \
pRSmaInfo->items[__idx].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA"); \
if (!pRSmaInfo->items[__idx].tmrHandle) { \
goto _err; \
} \
}
struct SRSmaInfoItem { struct SRSmaInfoItem {
SRSmaInfo *pRsmaInfo; SRSmaInfo *pRsmaInfo;
...@@ -56,14 +38,6 @@ struct SRSmaInfoItem { ...@@ -56,14 +38,6 @@ struct SRSmaInfoItem {
int8_t triggerStatus; // TASK_TRIGGER_STATUS__IN_ACTIVE/TASK_TRIGGER_STATUS__ACTIVE int8_t triggerStatus; // TASK_TRIGGER_STATUS__IN_ACTIVE/TASK_TRIGGER_STATUS__ACTIVE
int32_t maxDelay; int32_t maxDelay;
}; };
typedef struct {
int64_t suid;
SRSmaInfoItem *pItem;
SSma *pSma;
STSchema *pTSchema;
} SRSmaTriggerParam;
struct SRSmaInfo { struct SRSmaInfo {
STSchema *pTSchema; STSchema *pTSchema;
SSma *pSma; SSma *pSma;
...@@ -81,7 +55,7 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle) { ...@@ -81,7 +55,7 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle) {
void *tdFreeRSmaInfo(SRSmaInfo *pInfo) { void *tdFreeRSmaInfo(SRSmaInfo *pInfo) {
if (pInfo) { if (pInfo) {
for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i]; SRSmaInfoItem *pItem = &pInfo->items[i];
if (pItem->taskInfo) { if (pItem->taskInfo) {
tdFreeTaskHandle(pItem->taskInfo); tdFreeTaskHandle(pItem->taskInfo);
...@@ -118,7 +92,7 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA ...@@ -118,7 +92,7 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t)); pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), suid, sizeof(tb_uid_t));
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid); smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
terrno = TSDB_CODE_RSMA_INVALID_STAT; terrno = TSDB_CODE_RSMA_INVALID_STAT;
...@@ -187,7 +161,7 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui ...@@ -187,7 +161,7 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
SSmaStat *pStat = SMA_ENV_STAT(pEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SHashObj *infoHash = NULL; SHashObj *infoHash = NULL;
if (!pStat || !(infoHash = SMA_STAT_INFO_HASH(pStat))) { if (!pStat || !(infoHash = SMA_RSMA_INFO_HASH(pStat))) {
terrno = TSDB_CODE_RSMA_INVALID_STAT; terrno = TSDB_CODE_RSMA_INVALID_STAT;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -213,6 +187,40 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui ...@@ -213,6 +187,40 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo *pRSmaInfo, SReadHandle *pReadHandle,
int8_t idx) {
SRetention *pRetention = SMA_RETENTION(pSma);
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
if (param->qmsg[idx]) {
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
pItem->pRsmaInfo = pRSmaInfo;
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[0], pReadHandle);
if (!pItem->taskInfo) {
goto _err;
}
pItem->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval =
convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND);
pItem->maxDelay = (int32_t)msInterval;
} else {
pItem->maxDelay = (int32_t)param->maxdelay[idx];
}
if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) {
pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY;
}
pItem->level = (idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2);
pItem->tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
if (!pItem->tmrHandle) {
goto _err;
}
}
return TSDB_CODE_SUCCESS;
_err:
return TSDB_CODE_FAILED;
}
/** /**
* @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam. * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam.
* *
...@@ -246,7 +254,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { ...@@ -246,7 +254,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
SSmaStat *pStat = SMA_ENV_STAT(pEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL; SRSmaInfo *pRSmaInfo = NULL;
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t)); pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t));
if (pRSmaInfo) { if (pRSmaInfo) {
ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally
smaWarn("vgId:%d, rsma info already exists for stb: %s, %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); smaWarn("vgId:%d, rsma info already exists for stb: %s, %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
...@@ -282,14 +290,14 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { ...@@ -282,14 +290,14 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
pRSmaInfo->pSma = pSma; pRSmaInfo->pSma = pSma;
pRSmaInfo->suid = pReq->suid; pRSmaInfo->suid = pReq->suid;
SRetention *pRetention = SMA_RETENTION(pSma); if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 0) < 0) {
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); goto _err;
}
SET_RSMA_INFO_ITEM_PARAMS(0, 1); if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 1) < 0) {
SET_RSMA_INFO_ITEM_PARAMS(1, 2); goto _err;
}
if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != if (taosHashPut(SMA_RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) {
TSDB_CODE_SUCCESS) {
goto _err; goto _err;
} else { } else {
smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid); smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid);
...@@ -418,7 +426,6 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { ...@@ -418,7 +426,6 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
if (!pBlock) break; if (!pBlock) break;
tdUidStorePut(pStore, msgIter.suid, NULL); tdUidStorePut(pStore, msgIter.suid, NULL);
pStore->uid = msgIter.uid; // TODO: remove, just for debugging
} }
if (terrno != TSDB_CODE_SUCCESS) return -1; if (terrno != TSDB_CODE_SUCCESS) return -1;
...@@ -439,8 +446,9 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) ...@@ -439,8 +446,9 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
if (!output) { if (!output) {
break; break;
} }
if (!pResult) { if (!pResult) {
pResult = taosArrayInit(0, sizeof(SSDataBlock)); pResult = taosArrayInit(1, sizeof(SSDataBlock));
if (!pResult) { if (!pResult) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
...@@ -451,7 +459,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) ...@@ -451,7 +459,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
} }
if (taosArrayGetSize(pResult) > 0) { if (taosArrayGetSize(pResult) > 0) {
#if 0 #if 1
char flag[10] = {0}; char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, pItem->level); snprintf(flag, 10, "level %" PRIi8, pItem->level);
blockDebugShowData(pResult, flag); blockDebugShowData(pResult, flag);
...@@ -459,14 +467,12 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) ...@@ -459,14 +467,12 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
SSubmitReq *pReq = NULL; SSubmitReq *pReq = NULL;
if (buildSubmitReqFromDataBlock(&pReq, pResult, pRSmaInfo->pTSchema, SMA_VID(pSma), pRSmaInfo->suid) < 0) { if (buildSubmitReqFromDataBlock(&pReq, pResult, pRSmaInfo->pTSchema, SMA_VID(pSma), pRSmaInfo->suid) < 0) {
taosArrayDestroy(pResult); goto _err;
return TSDB_CODE_FAILED;
} }
if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) { if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) {
taosArrayDestroy(pResult);
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
return TSDB_CODE_FAILED; goto _err;
} }
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
...@@ -479,7 +485,10 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) ...@@ -479,7 +485,10 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
} }
taosArrayDestroy(pResult); taosArrayDestroy(pResult);
return 0; return TSDB_CODE_SUCCESS;
_err:
taosArrayDestroy(pResult);
return TSDB_CODE_FAILED;
} }
/** /**
...@@ -488,13 +497,12 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) ...@@ -488,13 +497,12 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
* @param param * @param param
* @param tmrId * @param tmrId
*/ */
static void rsmaTriggerByTimer(void *param, void *tmrId) { static void tdRSmaFetchTrigger(void *param, void *tmrId) {
// SRSmaTriggerParam *pParam = (SRSmaTriggerParam *)param;
// SRSmaInfoItem *pItem = pParam->pItem;
SRSmaInfoItem *pItem = param; SRSmaInfoItem *pItem = param;
if (atomic_load_8(&pItem->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { if (atomic_load_8(&pItem->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
smaTrace("level %" PRIi8 " status is active for tb suid:%" PRIi64, pItem->level, pItem->pRsmaInfo->suid); smaWarn("%s:%d THREAD:%" PRIi64 " level %" PRIi8 " status is active for tb suid:%" PRIi64, __func__, __LINE__,
taosGetSelfPthreadId(), pItem->level, pItem->pRsmaInfo->suid);
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE);
...@@ -502,10 +510,11 @@ static void rsmaTriggerByTimer(void *param, void *tmrId) { ...@@ -502,10 +510,11 @@ static void rsmaTriggerByTimer(void *param, void *tmrId) {
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SSDATA_BLOCK); tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SSDATA_BLOCK);
} else { } else {
smaTrace("level %" PRIi8 " status is inactive for tb suid:%" PRIi64, pItem->level, pItem->pRsmaInfo->suid); smaWarn("%s:%d THREAD:%" PRIi64 " level %" PRIi8 " status is inactive for tb suid:%" PRIi64, __func__, __LINE__,
taosGetSelfPthreadId(), pItem->level, pItem->pRsmaInfo->suid);
} }
// taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); // taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId);
} }
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem,
...@@ -518,16 +527,20 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 ...@@ -518,16 +527,20 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
pItem->taskInfo, suid); pItem->taskInfo, suid);
// inputType = STREAM_DATA_TYPE_SUBMIT_BLOCK(1) if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) { // STREAM_DATA_TYPE_SUBMIT_BLOCK
if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) {
smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
// SRSmaTriggerParam triggerParam = {.suid = suid, .pItem = pItem, .pSma = pSma, .pTSchema = pTSchema};
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK); tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK);
atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE); atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE);
taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); smaWarn("%s:%d THREAD:%" PRIi64 " process rsma insert", __func__, __LINE__, taosGetSelfPthreadId());
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat);
taosTmrStart(tdRSmaPersistTrigger, 5000, pStat, pStat->tmrHandle);
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -542,7 +555,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb ...@@ -542,7 +555,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
SSmaStat *pStat = SMA_ENV_STAT(pEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL; SRSmaInfo *pRSmaInfo = NULL;
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
smaDebug("vgId:%d, return as no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); smaDebug("vgId:%d, return as no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
...@@ -594,3 +607,106 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { ...@@ -594,3 +607,106 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char* outputName) {
tdGetVndFileName(vid, "rsma", tdQTaskInfoFname[ftype], outputName);
}
static void *tdRSmaPersistExec(void *param) {
setThreadName("rsma-task-persist");
SRSmaStat *pRSmaStat = param;
SSma *pSma = pRSmaStat->pSma;
STfs *pTfs = pSma->pVnode->pTfs;
int64_t toffset = 0;
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
if (!infoHash) {
goto _end;
}
STFile tFile = {0};
int32_t vid = 2;
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskGetFName(vid, TD_QTASK_TMP_FILE, qTaskInfoFName);
tdInitTFile(&tFile, pTfs, qTaskInfoFName);
tdCreateTFile(&tFile, pTfs, true, -1);
while (infoHash) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
char *pOutput = NULL;
int32_t len = 0;
if (qSerializeTaskStatus(pRSmaInfo->items[0].taskInfo, &pOutput, &len) < 0) {
smaError("serialize rsma task for table %" PRIi64 " failed since %s", pRSmaInfo->items[0].pRsmaInfo->suid,
terrstr(terrno));
} else {
smaWarn("serialize rsma task for table %" PRIi64 " success and len is %d", pRSmaInfo->items[0].pRsmaInfo->suid,
len);
}
tdAppendTFile(&tFile, &len, sizeof(len), &toffset);
tdAppendTFile(&tFile, pOutput, len, &toffset);
taosMemoryFree(pOutput);
infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash);
}
_end:
if (tdUpdateTFileHeader(&tFile) < 0) {
smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_FILE_FULL_NAME(&tFile), tstrerror(terrno));
tdCloseTFile(&tFile);
tdRemoveTFile(&tFile);
return NULL;
}
tdCloseTFile(&tFile);
char newFName[TSDB_FILENAME_LEN];
strncpy(newFName, TD_FILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN);
char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_FILE]);
strncpy(pos, tdQTaskInfoFname[TD_QTASK_CUR_FILE], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName));
taosRenameFile(TD_FILE_FULL_NAME(&tFile), newFName);
atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__ACTIVE);
return NULL;
_err:
atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__ACTIVE);
// remove the .tmp file
return NULL;
}
static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
smaWarn("%s:%d entry ", __func__, __LINE__);
TdThread threadId;
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED);
if (taosThreadCreate(&threadId, &thAttr, tdRSmaPersistExec, pRSmaStat) != 0) {
smaError("failed to create thread to persist rsma qTaskInfo since %s", strerror(errno));
}
taosThreadAttrDestroy(&thAttr);
smaWarn("%s:%d end ", __func__, __LINE__);
}
/**
* @brief trigger to persist rsma qTaskInfo
*
* @param param
* @param tmrId
*/
static void tdRSmaPersistTrigger(void *param, void *tmrId) {
SRSmaStat *pRSmaStat = param;
if (atomic_load_8(&pRSmaStat->tmrStat) == TASK_TRIGGER_STATUS__ACTIVE) {
smaWarn("%s:%d THREAD:%" PRIi64 " rsma persistence start since active", __func__, __LINE__, taosGetSelfPthreadId());
atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__IN_ACTIVE);
// execution
tdRSmaPersistTask(pRSmaStat);
} else {
smaWarn("%s:%d THREAD:%" PRIi64 " rsma persistence not start since inactive", __func__, __LINE__,
taosGetSelfPthreadId());
}
taosTmrReset(tdRSmaPersistTrigger, 3600000, pRSmaStat, pRSmaStat->tmrHandle, &pRSmaStat->tmrId);
}
\ No newline at end of file
...@@ -129,7 +129,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { ...@@ -129,7 +129,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma); SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
SSmaStat *pStat = NULL; SSmaStat *pStat = NULL;
SSmaStatItem *pItem = NULL; STSmaStat *pItem = NULL;
if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) { if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) {
terrno = TSDB_CODE_TSMA_INVALID_STAT; terrno = TSDB_CODE_TSMA_INVALID_STAT;
...@@ -137,7 +137,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { ...@@ -137,7 +137,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
} }
tdRefSmaStat(pSma, pStat); tdRefSmaStat(pSma, pStat);
pItem = &pStat->tsmaStatItem; pItem = &pStat->tsmaStat;
ASSERT(pItem); ASSERT(pItem);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
#define TD_FILE_HEAD_SIZE 512
#define TD_FILE_STATE_OK 0
#define TD_FILE_STATE_BAD 1
#define TD_FILE_INIT_MAGIC 0xFFFFFFFF
static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo);
static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo);
static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) {
int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pInfo->magic);
tlen += taosEncodeFixedU32(buf, pInfo->ftype);
tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedU64(buf, pInfo->fsize);
return tlen;
}
static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
buf = taosDecodeFixedU32(buf, &(pInfo->ftype));
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedU64(buf, &(pInfo->fsize));
return buf;
}
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) {
ASSERT(TD_FILE_OPENED(pTFile));
int64_t nwrite = taosWriteFile(pTFile->pFile, buf, nbyte);
if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nwrite;
}
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) {
ASSERT(TD_FILE_OPENED(pTFile));
int64_t loffset = taosLSeekFile(TD_FILE_PFILE(pTFile), offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return loffset;
}
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) {
ASSERT(TD_FILE_OPENED(pTFile));
int64_t nread = taosReadFile(pTFile->pFile, buf, nbyte);
if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nread;
}
int32_t tdUpdateTFileHeader(STFile *pTFile) {
char buf[TD_FILE_HEAD_SIZE] = "\0";
if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) {
return -1;
}
void *ptr = buf;
tdEncodeTFInfo(&ptr, &(pTFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_FILE_HEAD_SIZE);
if (tdWriteTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) {
return -1;
}
return 0;
}
int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) {
char buf[TD_FILE_HEAD_SIZE] = "\0";
uint32_t _version;
ASSERT(TD_FILE_OPENED(pTFile));
if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) {
return -1;
}
if (tdReadTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) {
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)buf, TD_FILE_HEAD_SIZE)) {
terrno = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
void *pBuf = buf;
pBuf = tdDecodeTFInfo(pBuf, pInfo);
return 0;
}
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) {
pTFile->info.magic = taosCalcChecksum(pTFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM));
}
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) {
ASSERT(TD_FILE_OPENED(pTFile));
int64_t toffset;
if ((toffset = tdSeekTFile(pTFile, 0, SEEK_END)) < 0) {
return -1;
}
ASSERT(pTFile->info.fsize == toffset);
if (offset) {
*offset = toffset;
}
if (tdWriteTFile(pTFile, buf, nbyte) < 0) {
return -1;
}
pTFile->info.fsize += nbyte;
return nbyte;
}
int32_t tdOpenTFile(STFile *pTFile, int flags) {
ASSERT(!TD_FILE_OPENED(pTFile));
pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), flags);
if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
void tdCloseTFile(STFile *pTFile) {
if (TD_FILE_OPENED(pTFile)) {
taosCloseFile(&pTFile->pFile);
TD_FILE_SET_CLOSED(pTFile);
}
}
void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName) {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/%s", vid, dname, fname);
}
int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) {
char fullname[TSDB_FILENAME_LEN];
SDiskID did = {0};
TD_FILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
TD_FILE_SET_CLOSED(pTFile);
memset(&(pTFile->info), 0, sizeof(pTFile->info));
pTFile->info.magic = TD_FILE_INIT_MAGIC;
if (tfsAllocDisk(pTfs, 0, &did) < 0) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return -1;
}
tfsInitFile(pTfs, &(pTFile->f), did, fname);
return 0;
}
int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType) {
ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC);
pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) {
if (errno == ENOENT) {
// Try to create directory recursively
char *s = strdup(TD_FILE_REL_NAME(pTFile));
if (tfsMkdirRecurAt(pTfs, taosDirName(s), TD_FILE_DID(pTFile)) < 0) {
taosMemoryFreeClear(s);
return -1;
}
taosMemoryFreeClear(s);
pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
} else {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
if (!updateHeader) {
return 0;
}
pTFile->info.fsize += TD_FILE_HEAD_SIZE;
pTFile->info.fver = 0;
if (tdUpdateTFileHeader(pTFile) < 0) {
tdCloseTFile(pTFile);
tdRemoveTFile(pTFile);
return -1;
}
return 0;
}
int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_FILE_F(pTFile)); }
\ No newline at end of file
...@@ -86,7 +86,7 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp ...@@ -86,7 +86,7 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0); SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
SDeleterRes* pRes = (SDeleterRes*)pEntry->data; SDeleterRes* pRes = (SDeleterRes*)pEntry->data;
pRes->uid = pHandle->pDeleter->tableId; pRes->suid = pHandle->pParam->suid;
pRes->uidList = pHandle->pParam->pUidList; pRes->uidList = pHandle->pParam->pUidList;
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey; pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey; pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
......
...@@ -300,6 +300,8 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo ...@@ -300,6 +300,8 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
uint64_t tableUid = pScanNode->uid; uint64_t tableUid = pScanNode->uid;
pListInfo->suid = pScanNode->suid;
SNode* pTagCond = (SNode*)pListInfo->pTagCond; SNode* pTagCond = (SNode*)pListInfo->pTagCond;
SNode* pTagIndexCond = (SNode*)pListInfo->pTagIndexCond; SNode* pTagIndexCond = (SNode*)pListInfo->pTagIndexCond;
if (pScanNode->tableType == TSDB_SUPER_TABLE) { if (pScanNode->tableType == TSDB_SUPER_TABLE) {
......
...@@ -4465,6 +4465,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT ...@@ -4465,6 +4465,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList); int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList);
pDeleterParam->suid = pTask->tableqinfoList.suid;
pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t)); pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
if (NULL == pDeleterParam->pUidList) { if (NULL == pDeleterParam->pUidList) {
taosMemoryFree(pDeleterParam); taosMemoryFree(pDeleterParam);
......
...@@ -271,7 +271,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen ...@@ -271,7 +271,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen
SDeleterRes* pDelRes = (SDeleterRes*)output.pData; SDeleterRes* pDelRes = (SDeleterRes*)output.pData;
rsp.affectedRows = pDelRes->affectedRows; rsp.affectedRows = pDelRes->affectedRows;
pRes->uid = pDelRes->uid; pRes->suid = pDelRes->suid;
pRes->uidList = pDelRes->uidList; pRes->uidList = pDelRes->uidList;
pRes->skey = pDelRes->skey; pRes->skey = pDelRes->skey;
pRes->ekey = pDelRes->ekey; pRes->ekey = pDelRes->ekey;
......
...@@ -76,6 +76,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed") ...@@ -76,6 +76,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
...@@ -356,7 +357,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, "No table data in memo ...@@ -356,7 +357,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, "No table data in memo
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, "File already exists") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, "File already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, "Need to reconfigure table") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, "Need to reconfigure table")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, "Invalid information to create table") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, "Invalid information to create table")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "TSDB no available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
......
...@@ -233,6 +233,7 @@ if $data(1)[3] != dropping then ...@@ -233,6 +233,7 @@ if $data(1)[3] != dropping then
endi endi
print =============== step9: start mnode1 and wait it dropped print =============== step9: start mnode1 and wait it dropped
sleep 3000
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
$x = 0 $x = 0
......
...@@ -233,12 +233,12 @@ if $rows != 4 then ...@@ -233,12 +233,12 @@ if $rows != 4 then
endi endi
sql show dnode 1 variables; sql show dnode 1 variables;
if $rows != 114 then if $rows <= 0 then
return -1 return -1
endi endi
sql show local variables; sql show local variables;
if $rows != 50 then if $rows <= 0 then
return -1 return -1
endi endi
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
python3 .\test.py -f 0-others\taosShellNetChk.py python3 .\test.py -f 0-others\taosShellNetChk.py
python3 .\test.py -f 0-others\telemetry.py python3 .\test.py -f 0-others\telemetry.py
python3 .\test.py -f 0-others\taosdMonitor.py python3 .\test.py -f 0-others\taosdMonitor.py
@REM python3 .\test.py -f 0-others\udfTest.py python3 .\test.py -f 0-others\udfTest.py
@REM python3 .\test.py -f 0-others\udf_create.py @REM python3 .\test.py -f 0-others\udf_create.py
@REM python3 .\test.py -f 0-others\udf_restart_taosd.py @REM python3 .\test.py -f 0-others\udf_restart_taosd.py
@REM python3 .\test.py -f 0-others\cachelast.py @REM python3 .\test.py -f 0-others\cachelast.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册