提交 74b4e1c3 编写于 作者: D dapan1121

Merge branch 'feature/scheduler' of github.com:taosdata/TDengine into feature/scheduler

...@@ -240,12 +240,12 @@ typedef struct { ...@@ -240,12 +240,12 @@ typedef struct {
} SSubmitBlkIter; } SSubmitBlkIter;
typedef struct { typedef struct {
int32_t totalLen; int32_t totalLen;
int32_t len; int32_t len;
void* pMsg; const void* pMsg;
} SSubmitMsgIter; } SSubmitMsgIter;
int32_t tInitSubmitMsgIter(SSubmitReq* pMsg, SSubmitMsgIter* pIter); int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
...@@ -2099,6 +2099,11 @@ static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) { ...@@ -2099,6 +2099,11 @@ static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) {
} }
} }
static FORCE_INLINE void tdFreeTSmaWrapper(STSmaWrapper* pSW) {
tdDestroyTSmaWrapper(pSW);
tfree(pSW);
}
static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) { static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
int32_t tlen = 0; int32_t tlen = 0;
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
#undef TD_MSG_SEG_CODE_ #undef TD_MSG_SEG_CODE_
#include "tmsgdef.h" #include "tmsgdef.h"
int32_t tInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter) { int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) { if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1; return -1;
......
...@@ -458,16 +458,21 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { ...@@ -458,16 +458,21 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
if (duration == 0) { if (duration == 0) {
return t; return t;
} }
if (unit == 'y') {
duration *= 12; if (unit != 'n' && unit != 'y') {
} else if (unit != 'n') {
return t + duration; return t + duration;
} }
// The following code handles the y/n time duration
int64_t numOfMonth = duration;
if (unit == 'y') {
numOfMonth *= 12;
}
struct tm tm; struct tm tm;
time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision)); time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision));
taosLocalTime(&tt, &tm); taosLocalTime(&tt, &tm);
int32_t mon = tm.tm_year * 12 + tm.tm_mon + (int32_t)duration; int32_t mon = tm.tm_year * 12 + tm.tm_mon + (int32_t)numOfMonth;
tm.tm_year = mon / 12; tm.tm_year = mon / 12;
tm.tm_mon = mon % 12; tm.tm_mon = mon % 12;
...@@ -574,12 +579,23 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio ...@@ -574,12 +579,23 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
} }
} }
ASSERT(pInterval->offset >= 0);
if (pInterval->offset > 0) { if (pInterval->offset > 0) {
start = taosTimeAdd(start, pInterval->offset, pInterval->offsetUnit, precision); start = taosTimeAdd(start, pInterval->offset, pInterval->offsetUnit, precision);
if (start > t) { if (start > t) {
start = taosTimeAdd(start, -pInterval->interval, pInterval->intervalUnit, precision); start = taosTimeAdd(start, -pInterval->interval, pInterval->intervalUnit, precision);
} else {
// try to move current window to the left-hande-side, due to the offset effect.
int64_t end = start + pInterval->interval - 1;
ASSERT(end >= t);
end = taosTimeAdd(end, -pInterval->sliding, pInterval->slidingUnit, precision);
if (end >= t) {
start = taosTimeAdd(start, -pInterval->sliding, pInterval->slidingUnit, precision);
}
} }
} }
return start; return start;
} }
......
...@@ -87,6 +87,15 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp); ...@@ -87,6 +87,15 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int tsdbPrepareCommit(STsdb *pTsdb); int tsdbPrepareCommit(STsdb *pTsdb);
int tsdbCommit(STsdb *pTsdb); int tsdbCommit(STsdb *pTsdb);
/**
* @brief When submit msg received, update the relative expired window synchronously.
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, const char *msg);
/** /**
* @brief Insert tSma(Time-range-wise SMA) data from stream computing engine * @brief Insert tSma(Time-range-wise SMA) data from stream computing engine
* *
...@@ -95,11 +104,18 @@ int tsdbCommit(STsdb *pTsdb); ...@@ -95,11 +104,18 @@ int tsdbCommit(STsdb *pTsdb);
* @return int32_t * @return int32_t
*/ */
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg); int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg);
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg);
/**
* @brief Drop tSma data and local cache.
*
* @param pTsdb
* @param indexUid
* @return int32_t
*/
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
/** /**
* @brief Insert RSma(Time-range-wise Rollup SMA) data. * @brief Insert RSma(Rollup SMA) data.
* *
* @param pTsdb * @param pTsdb
* @param msg * @param msg
...@@ -108,6 +124,20 @@ int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); ...@@ -108,6 +124,20 @@ int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
// TODO: This is the basic params, and should wrap the params to a queryHandle. // TODO: This is the basic params, and should wrap the params to a queryHandle.
/**
* @brief Get tSma(Time-range-wise SMA) data.
*
* @param pTsdb
* @param pData
* @param indexUid
* @param interval
* @param intervalUnit
* @param tableUid
* @param colId
* @param querySKey
* @param nMaxResult
* @return int32_t
*/
int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit, int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit,
tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult); tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult);
......
...@@ -63,6 +63,7 @@ struct STsdb { ...@@ -63,6 +63,7 @@ struct STsdb {
#define REPO_ID(r) ((r)->vgId) #define REPO_ID(r) ((r)->vgId)
#define REPO_CFG(r) (&(r)->config) #define REPO_CFG(r) (&(r)->config)
#define REPO_FS(r) (r)->fs #define REPO_FS(r) (r)->fs
#define REPO_META(r) (r)->pMeta
#define REPO_TFS(r) (r)->pTfs #define REPO_TFS(r) (r)->pTfs
#define IS_REPO_LOCKED(r) (r)->repoLocked #define IS_REPO_LOCKED(r) (r)->repoLocked
#define REPO_SMA_ENV(r, t) ((TSDB_SMA_TYPE_ROLLUP == (t)) ? (r)->pRSmaEnv : (r)->pTSmaEnv) #define REPO_SMA_ENV(r, t) ((TSDB_SMA_TYPE_ROLLUP == (t)) ? (r)->pRSmaEnv : (r)->pTSmaEnv)
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#ifndef _TD_TSDB_SMA_H_ #ifndef _TD_TSDB_SMA_H_
#define _TD_TSDB_SMA_H_ #define _TD_TSDB_SMA_H_
#define TSDB_SMA_TEST // remove after test finished
typedef struct SSmaStat SSmaStat; typedef struct SSmaStat SSmaStat;
typedef struct SSmaEnv SSmaEnv; typedef struct SSmaEnv SSmaEnv;
......
...@@ -38,7 +38,7 @@ typedef enum { ...@@ -38,7 +38,7 @@ typedef enum {
} ESmaStorageLevel; } ESmaStorageLevel;
typedef struct { typedef struct {
STsdb * pTsdb; STsdb *pTsdb;
SDBFile dFile; SDBFile dFile;
int32_t interval; // interval with the precision of DB int32_t interval; // interval with the precision of DB
} STSmaWriteH; } STSmaWriteH;
...@@ -49,7 +49,7 @@ typedef struct { ...@@ -49,7 +49,7 @@ typedef struct {
} SmaFsIter; } SmaFsIter;
typedef struct { typedef struct {
STsdb * pTsdb; STsdb *pTsdb;
SDBFile dFile; SDBFile dFile;
int32_t interval; // interval with the precision of DB int32_t interval; // interval with the precision of DB
int32_t blockSize; // size of SMA block item int32_t blockSize; // size of SMA block item
...@@ -69,7 +69,7 @@ typedef struct { ...@@ -69,7 +69,7 @@ typedef struct {
*/ */
int8_t state; // ETsdbSmaStat int8_t state; // ETsdbSmaStat
SHashObj *expiredWindows; // key: skey of time window, value: N/A SHashObj *expiredWindows; // key: skey of time window, value: N/A
STSma * pSma; // cache schema STSma *pSma; // cache schema
} SSmaStatItem; } SSmaStatItem;
struct SSmaStat { struct SSmaStat {
...@@ -80,9 +80,9 @@ struct SSmaStat { ...@@ -80,9 +80,9 @@ struct SSmaStat {
// declaration of static functions // declaration of static functions
// expired window // expired window
static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg); static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg);
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
static void * tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem); static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem);
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did); static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did);
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv); static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv);
...@@ -124,7 +124,7 @@ static FORCE_INLINE int8_t tsdbSmaStat(SSmaStatItem *pStatItem) { ...@@ -124,7 +124,7 @@ static FORCE_INLINE int8_t tsdbSmaStat(SSmaStatItem *pStatItem) {
} }
static FORCE_INLINE bool tsdbSmaStatIsOK(SSmaStatItem *pStatItem, int8_t *state) { static FORCE_INLINE bool tsdbSmaStatIsOK(SSmaStatItem *pStatItem, int8_t *state) {
if(!pStatItem) { if (!pStatItem) {
return false; return false;
} }
...@@ -384,49 +384,20 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { ...@@ -384,49 +384,20 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
}; };
/** static STimeWindow getActiveTimeWindowX(int64_t ts, SInterval* pInterval) {
* @brief Update expired window according to msg from stream computing module. STimeWindow tw = {0};
* tw.skey = 100;
* @param pTsdb tw.ekey = 1000;
* @param smaType ETsdbSmaType return tw;
* @param msg }
* @return int32_t
*/
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
if (!msg || !pTsdb->pMeta) {
terrno = TSDB_CODE_INVALID_PTR;
return TSDB_CODE_FAILED;
}
// TODO: decode the msg from Stream Computing module => start
int64_t indexUid = SMA_TEST_INDEX_UID;
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
TSKEY skey1 = 1646987196 * 1e3;
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
expiredWindows[i] = skey1 + i;
}
// TODO: decode the msg <= end
if (tsdbCheckAndInitSmaEnv(pTsdb, smaType) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED;
}
SSmaEnv * pEnv = REPO_SMA_ENV(pTsdb, smaType);
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);
TASSERT(pEnv != NULL && pStat != NULL && pItemsHash != NULL);
tsdbRefSmaStat(pTsdb, pStat); static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey) {
SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
if (pItem == NULL) { if (pItem == NULL) {
pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state
if (pItem == NULL) { if (pItem == NULL) {
// Response to stream computing: OOM // Response to stream computing: OOM
// For query, if the indexUid not found, the TSDB should tell query module to query raw TS data. // For query, if the indexUid not found, the TSDB should tell query module to query raw TS data.
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -436,7 +407,6 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) { ...@@ -436,7 +407,6 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META; terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
taosHashCleanup(pItem->expiredWindows); taosHashCleanup(pItem->expiredWindows);
free(pItem); free(pItem);
tsdbUnRefSmaStat(pTsdb, pStat);
tsdbWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid, tsdbWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid,
tstrerror(terrno)); tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
...@@ -447,34 +417,150 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) { ...@@ -447,34 +417,150 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
// If error occurs during put smaStatItem, free the resources of pItem // If error occurs during put smaStatItem, free the resources of pItem
taosHashCleanup(pItem->expiredWindows); taosHashCleanup(pItem->expiredWindows);
free(pItem); free(pItem);
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
} }
int8_t state = TSDB_SMA_STAT_EXPIRED; int8_t state = TSDB_SMA_STAT_EXPIRED;
if (taosHashPut(pItem->expiredWindows, &winSKey, sizeof(TSKEY), &state, sizeof(state)) != 0) {
// If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
// tell query module to query raw TS data.
// N.B.
// 1) It is assumed to be extemely little probability event of fail to taosHashPut.
// 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
// windows failed to put into hash table.
taosHashCleanup(pItem->expiredWindows);
tfree(pItem->pSma);
taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
return TSDB_CODE_FAILED;
}
tsdbDebug("vgId:%d smaIndex %" PRIi64 " tsKey %" PRIi64 " is put to hash", REPO_ID(pTsdb), indexUid, winSKey);
}
/**
* @brief Update expired window according to msg from stream computing module.
*
* @param pTsdb
* @param msg SSubmitReq
* @return int32_t
*/
int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) {
const SSubmitReq *pMsg = (const SSubmitReq *)msg;
if (pMsg->length <= sizeof(SSubmitReq)) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return TSDB_CODE_FAILED;
}
if (!pTsdb->pMeta) {
terrno = TSDB_CODE_INVALID_PTR;
return TSDB_CODE_FAILED;
}
// TODO: decode the msg from Stream Computing module => start
#ifdef TSDB_SMA_TESTx
int64_t indexUid = SMA_TEST_INDEX_UID;
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
TSKEY skey1 = 1646987196 * 1e3;
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
if (taosHashPut(pItem->expiredWindows, expiredWindows + i, sizeof(TSKEY), &state, sizeof(state)) != 0) { expiredWindows[i] = skey1 + i;
// If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would }
// tell query module to query raw TS data. #else
// N.B.
// 1) It is assumed to be extemely little probability event of fail to taosHashPut. #endif
// 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired // TODO: decode the msg <= end
// windows failed to put into hash table.
taosHashCleanup(pItem->expiredWindows); if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) {
tfree(pItem->pSma); terrno = TSDB_CODE_TDB_INIT_FAILED;
taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid)); return TSDB_CODE_FAILED;
tsdbUnRefSmaStat(pTsdb, pStat); }
return TSDB_CODE_FAILED;
#ifndef TSDB_SMA_TEST
TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
#endif
// Firstly, assume that tSma can only be created on super table/normal table.
// getActiveTimeWindow
SSmaEnv *pEnv = REPO_SMA_ENV(pTsdb, TSDB_SMA_TYPE_TIME_RANGE);
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);
TASSERT(pEnv != NULL && pStat != NULL && pItemsHash != NULL);
SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock = NULL;
SInterval interval = {0};
if (tInitSubmitMsgIter(pMsg, &msgIter) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
// basic procedure
// TODO: optimization
tsdbRefSmaStat(pTsdb, pStat);
while (true) {
tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
int64_t suid = htobe64(pBlock->uid);
STSmaWrapper *pSW = NULL;
STSma *pTSma = NULL;
while (true) {
SSubmitBlkIter blkIter = {0};
if (tInitSubmitBlkIter(pBlock, &blkIter) != TSDB_CODE_SUCCESS) {
tdFreeTSmaWrapper(pSW);
break;
}
STSRow *row = tGetSubmitBlkNext(&blkIter);
if (row == NULL) {
tdFreeTSmaWrapper(pSW);
break;
}
if(pSW == NULL) {
if((pSW =metaGetSmaInfoByTable(REPO_META(pTsdb), suid)) == NULL) {
break;
}
if((pSW->number) <= 0 || (pSW->tSma == NULL)) {
tdFreeTSmaWrapper(pSW);
break;
}
pTSma = pSW->tSma;
}
interval.interval = pTSma->interval;
interval.intervalUnit = pTSma->intervalUnit;
interval.offset = pTSma->offset;
interval.precision = REPO_CFG(pTsdb)->precision;
interval.sliding = pTSma->sliding;
interval.slidingUnit = pTSma->slidingUnit;
STimeWindow tw = getActiveTimeWindowX(TD_ROW_KEY(row), &interval);
tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, TD_ROW_KEY(row));
} }
tsdbDebug("vgId:%d smaIndex %" PRIi64 " tsKey %" PRIi64 " is put to hash", REPO_ID(pTsdb), indexUid,
expiredWindows[i]);
} }
tsdbUnRefSmaStat(pTsdb, pStat); tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/**
* @brief When sma data received from stream computing, make the relative expired window valid.
*
* @param pTsdb
* @param pStat
* @param indexUid
* @param skey
* @return int32_t
*/
static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey) { static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey) {
SSmaStatItem *pItem = NULL; SSmaStatItem *pItem = NULL;
...@@ -582,7 +668,7 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t k ...@@ -582,7 +668,7 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t k
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_ #ifdef _TEST_SMA_PRINT_DEBUG_LOG_
uint32_t valueSize = 0; uint32_t valueSize = 0;
void * data = tsdbGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize); void *data = tsdbGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize);
ASSERT(data != NULL); ASSERT(data != NULL);
for (uint32_t v = 0; v < valueSize; v += 8) { for (uint32_t v = 0; v < valueSize; v += 8) {
tsdbWarn("vgId:%d insert sma data val[%d] %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v)); tsdbWarn("vgId:%d insert sma data val[%d] %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v));
...@@ -699,7 +785,7 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p ...@@ -699,7 +785,7 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
assert(pTbData->dataLen > 0); assert(pTbData->dataLen > 0);
STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pTbData->data, tbLen); STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pTbData->data, tbLen);
char smaKey[SMA_KEY_LEN] = {0}; char smaKey[SMA_KEY_LEN] = {0};
void * pSmaKey = &smaKey; void *pSmaKey = &smaKey;
#if 0 #if 0
printf("tsdbInsertTSmaDataSection: index %" PRIi64 ", skey %" PRIi64 " table[%" PRIi64 "]col[%" PRIu16 "]\n", printf("tsdbInsertTSmaDataSection: index %" PRIi64 ", skey %" PRIi64 " table[%" PRIi64 "]col[%" PRIu16 "]\n",
pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId); pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId);
...@@ -773,11 +859,10 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe ...@@ -773,11 +859,10 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb); STsdbCfg *pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
SSmaEnv * pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
int64_t indexUid = SMA_TEST_INDEX_UID; int64_t indexUid = SMA_TEST_INDEX_UID;
if (pEnv == NULL) { if (pEnv == NULL) {
terrno = TSDB_CODE_INVALID_PTR; terrno = TSDB_CODE_INVALID_PTR;
...@@ -797,7 +882,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { ...@@ -797,7 +882,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SSmaStat *pStat = SMA_ENV_STAT(pTsdb->pTSmaEnv); SSmaStat *pStat = SMA_ENV_STAT(pTsdb->pTSmaEnv);
SSmaStatItem *pItem = NULL; SSmaStatItem *pItem = NULL;
tsdbRefSmaStat(pTsdb, pStat); tsdbRefSmaStat(pTsdb, pStat);
...@@ -812,8 +897,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { ...@@ -812,8 +897,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
char rPath[TSDB_FILENAME_LEN] = {0}; char rPath[TSDB_FILENAME_LEN] = {0};
char aPath[TSDB_FILENAME_LEN] = {0}; char aPath[TSDB_FILENAME_LEN] = {0};
snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid); snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath); tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath);
if (!taosCheckExistFile(aPath)) { if (!taosCheckExistFile(aPath)) {
...@@ -859,9 +944,9 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { ...@@ -859,9 +944,9 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
/** /**
* @brief Drop tSma data and local cache * @brief Drop tSma data and local cache
* - insert/query reference * - insert/query reference
* @param pTsdb * @param pTsdb
* @param msg * @param msg
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) { static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) {
SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
...@@ -902,8 +987,7 @@ static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) { ...@@ -902,8 +987,7 @@ static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) {
} }
} }
// clear sma data files // clear sma data files
// TODO: // TODO:
} }
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) { static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) {
...@@ -917,9 +1001,9 @@ static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, ...@@ -917,9 +1001,9 @@ static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData,
} }
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb); STsdbCfg *pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
SSmaEnv * pEnv = atomic_load_ptr(&pTsdb->pRSmaEnv); SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pRSmaEnv);
if (pEnv == NULL) { if (pEnv == NULL) {
terrno = TSDB_CODE_INVALID_PTR; terrno = TSDB_CODE_INVALID_PTR;
...@@ -1137,7 +1221,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_ ...@@ -1137,7 +1221,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
*(int64_t *)POINTER_SHIFT(smaKey, 10), SMA_KEY_LEN); *(int64_t *)POINTER_SHIFT(smaKey, 10), SMA_KEY_LEN);
void * result = NULL; void *result = NULL;
uint32_t valueSize = 0; uint32_t valueSize = 0;
if ((result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize)) == NULL) { if ((result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize)) == NULL) {
tsdbWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 tsdbWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64
...@@ -1219,15 +1303,8 @@ int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) { ...@@ -1219,15 +1303,8 @@ int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) {
} }
#endif #endif
/**
* @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine // TODO: Who is responsible for resource allocate and release?
*
* @param pTsdb
* @param param
* @param msg
* @return int32_t
* TODO: Who is responsible for resource allocate and release?
*/
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) { int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbInsertTSmaDataImpl(pTsdb, msg)) < 0) { if ((code = tsdbInsertTSmaDataImpl(pTsdb, msg)) < 0) {
...@@ -1236,21 +1313,14 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) { ...@@ -1236,21 +1313,14 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
return code; return code;
} }
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) { int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, const char *msg) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) { if ((code = tsdbUpdateExpiredWindowImpl(pTsdb, msg)) < 0) {
tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
} }
return code; return code;
} }
/**
* @brief Insert Time-range-wise Rollup Sma(RSma) data
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) { int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) { if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) {
...@@ -1259,20 +1329,7 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) { ...@@ -1259,20 +1329,7 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
return code; return code;
} }
/**
* @brief Get tSma data
*
* @param pTsdb
* @param pData
* @param indexUid
* @param interval
* @param intervalUnit
* @param tableUid
* @param colId
* @param querySKey
* @param nMaxResult
* @return int32_t
*/
int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit, int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit,
tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) { tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -1283,13 +1340,7 @@ int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, ...@@ -1283,13 +1340,7 @@ int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid,
return code; return code;
} }
/**
* @brief Drop tSma Data and caches
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid) { int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbDropTSmaDataImpl(pTsdb, indexUid)) < 0) { if ((code = tsdbDropTSmaDataImpl(pTsdb, indexUid)) < 0) {
......
...@@ -32,48 +32,4 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) { ...@@ -32,48 +32,4 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
} }
} }
return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, NULL); return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, NULL);
} }
\ No newline at end of file
#if 0
/**
* @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param param
* @param msg
* @return int32_t
* TODO: Who is responsible for resource allocate and release?
*/
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbInsertTSmaDataImpl(pTsdb, msg)) < 0) {
tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) {
tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
/**
* @brief Insert Time-range-wise Rollup Sma(RSma) data
*
* @param pTsdb
* @param param
* @param msg
* @return int32_t
*/
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) {
tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
#endif
\ No newline at end of file
...@@ -98,7 +98,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) { ...@@ -98,7 +98,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
tSma.slidingUnit = TIME_UNIT_HOUR; tSma.slidingUnit = TIME_UNIT_HOUR;
tSma.sliding = 0; tSma.sliding = 0;
tstrncpy(tSma.indexName, "sma_index_test", TSDB_INDEX_NAME_LEN); tstrncpy(tSma.indexName, "sma_index_test", TSDB_INDEX_NAME_LEN);
tstrncpy(tSma.timezone, "Asia/Shanghai", TD_TIMEZONE_LEN); tSma.timezoneInt = 8;
tSma.indexUid = 2345678910; tSma.indexUid = 2345678910;
tSma.tableUid = 1234567890; tSma.tableUid = 1234567890;
...@@ -128,7 +128,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) { ...@@ -128,7 +128,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
ASSERT_EQ(pSma->intervalUnit, qSma->intervalUnit); ASSERT_EQ(pSma->intervalUnit, qSma->intervalUnit);
ASSERT_EQ(pSma->slidingUnit, qSma->slidingUnit); ASSERT_EQ(pSma->slidingUnit, qSma->slidingUnit);
ASSERT_STRCASEEQ(pSma->indexName, qSma->indexName); ASSERT_STRCASEEQ(pSma->indexName, qSma->indexName);
ASSERT_STRCASEEQ(pSma->timezone, qSma->timezone); ASSERT_EQ(pSma->timezoneInt, qSma->timezoneInt);
ASSERT_EQ(pSma->indexUid, qSma->indexUid); ASSERT_EQ(pSma->indexUid, qSma->indexUid);
ASSERT_EQ(pSma->tableUid, qSma->tableUid); ASSERT_EQ(pSma->tableUid, qSma->tableUid);
ASSERT_EQ(pSma->interval, qSma->interval); ASSERT_EQ(pSma->interval, qSma->interval);
...@@ -150,7 +150,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) { ...@@ -150,7 +150,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
const char * smaIndexName1 = "sma_index_test_1"; const char * smaIndexName1 = "sma_index_test_1";
const char * smaIndexName2 = "sma_index_test_2"; const char * smaIndexName2 = "sma_index_test_2";
const char * timezone = "Asia/Shanghai"; int8_t timezone = 8;
const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;"; const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;";
const char * tagsFilter = "I'm tags filter"; const char * tagsFilter = "I'm tags filter";
const char * smaTestDir = "./smaTest"; const char * smaTestDir = "./smaTest";
...@@ -167,7 +167,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { ...@@ -167,7 +167,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
tSma.sliding = 0; tSma.sliding = 0;
tSma.indexUid = indexUid1; tSma.indexUid = indexUid1;
tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN); tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN);
tstrncpy(tSma.timezone, timezone, TD_TIMEZONE_LEN); tSma.timezoneInt = 8;
tSma.tableUid = tbUid; tSma.tableUid = tbUid;
tSma.exprLen = strlen(expr); tSma.exprLen = strlen(expr);
...@@ -207,7 +207,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { ...@@ -207,7 +207,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid1); qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid1);
assert(qSmaCfg != NULL); assert(qSmaCfg != NULL);
printf("name1 = %s\n", qSmaCfg->indexName); printf("name1 = %s\n", qSmaCfg->indexName);
printf("timezone1 = %s\n", qSmaCfg->timezone); printf("timezone1 = %" PRIi8 "\n", qSmaCfg->timezoneInt);
printf("expr1 = %s\n", qSmaCfg->expr != NULL ? qSmaCfg->expr : ""); printf("expr1 = %s\n", qSmaCfg->expr != NULL ? qSmaCfg->expr : "");
printf("tagsFilter1 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : ""); printf("tagsFilter1 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : "");
ASSERT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1); ASSERT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1);
...@@ -218,7 +218,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { ...@@ -218,7 +218,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid2); qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid2);
assert(qSmaCfg != NULL); assert(qSmaCfg != NULL);
printf("name2 = %s\n", qSmaCfg->indexName); printf("name2 = %s\n", qSmaCfg->indexName);
printf("timezone2 = %s\n", qSmaCfg->timezone); printf("timezone2 = %" PRIi8 "\n", qSmaCfg->timezoneInt);
printf("expr2 = %s\n", qSmaCfg->expr != NULL ? qSmaCfg->expr : ""); printf("expr2 = %s\n", qSmaCfg->expr != NULL ? qSmaCfg->expr : "");
printf("tagsFilter2 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : ""); printf("tagsFilter2 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : "");
ASSERT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2); ASSERT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2);
...@@ -246,13 +246,13 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { ...@@ -246,13 +246,13 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
assert(pSW != NULL); assert(pSW != NULL);
ASSERT_EQ(pSW->number, nCntTSma); ASSERT_EQ(pSW->number, nCntTSma);
ASSERT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1); ASSERT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1);
ASSERT_STRCASEEQ(pSW->tSma->timezone, timezone); ASSERT_EQ(pSW->tSma->timezoneInt, timezone);
ASSERT_STRCASEEQ(pSW->tSma->expr, expr); ASSERT_STRCASEEQ(pSW->tSma->expr, expr);
ASSERT_STRCASEEQ(pSW->tSma->tagsFilter, tagsFilter); ASSERT_STRCASEEQ(pSW->tSma->tagsFilter, tagsFilter);
ASSERT_EQ(pSW->tSma->indexUid, indexUid1); ASSERT_EQ(pSW->tSma->indexUid, indexUid1);
ASSERT_EQ(pSW->tSma->tableUid, tbUid); ASSERT_EQ(pSW->tSma->tableUid, tbUid);
ASSERT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2); ASSERT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2);
ASSERT_STRCASEEQ((pSW->tSma + 1)->timezone, timezone); ASSERT_EQ((pSW->tSma + 1)->timezoneInt, timezone);
ASSERT_STRCASEEQ((pSW->tSma + 1)->expr, expr); ASSERT_STRCASEEQ((pSW->tSma + 1)->expr, expr);
ASSERT_STRCASEEQ((pSW->tSma + 1)->tagsFilter, tagsFilter); ASSERT_STRCASEEQ((pSW->tSma + 1)->tagsFilter, tagsFilter);
ASSERT_EQ((pSW->tSma + 1)->indexUid, indexUid2); ASSERT_EQ((pSW->tSma + 1)->indexUid, indexUid2);
...@@ -284,7 +284,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { ...@@ -284,7 +284,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
TEST(testCase, tSma_Data_Insert_Query_Test) { TEST(testCase, tSma_Data_Insert_Query_Test) {
// step 1: prepare meta // step 1: prepare meta
const char * smaIndexName1 = "sma_index_test_1"; const char * smaIndexName1 = "sma_index_test_1";
const char * timezone = "Asia/Shanghai"; const int8_t timezone = 8;
const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;"; const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;";
const char * tagsFilter = "where tags.location='Beijing' and tags.district='ChaoYang'"; const char * tagsFilter = "where tags.location='Beijing' and tags.district='ChaoYang'";
const char * smaTestDir = "./smaTest"; const char * smaTestDir = "./smaTest";
...@@ -305,7 +305,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { ...@@ -305,7 +305,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
tSma.sliding = 0; tSma.sliding = 0;
tSma.indexUid = indexUid1; tSma.indexUid = indexUid1;
tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN); tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN);
tstrncpy(tSma.timezone, timezone, TD_TIMEZONE_LEN); tSma.timezoneInt = timezone;
tSma.tableUid = tbUid; tSma.tableUid = tbUid;
tSma.exprLen = strlen(expr); tSma.exprLen = strlen(expr);
...@@ -369,7 +369,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { ...@@ -369,7 +369,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
char *msg = (char *)calloc(1, 100); char *msg = (char *)calloc(1, 100);
ASSERT_NE(msg, nullptr); ASSERT_NE(msg, nullptr);
ASSERT_EQ(tsdbUpdateSmaWindow(pTsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0); ASSERT_EQ(tsdbUpdateSmaWindow(pTsdb, msg), 0);
// init // init
int32_t allocCnt = 0; int32_t allocCnt = 0;
......
...@@ -7683,7 +7683,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -7683,7 +7683,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
} }
pInfo->order = TSDB_ORDER_ASC; pInfo->order = TSDB_ORDER_ASC;
pInfo->precision = TSDB_TIME_PRECISION_MICRO; pInfo->precision = TSDB_TIME_PRECISION_MILLI;
pInfo->win = pTaskInfo->window; pInfo->win = pTaskInfo->window;
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
......
...@@ -1480,13 +1480,14 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) { ...@@ -1480,13 +1480,14 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: { case TSDB_DATA_TYPE_VARBINARY: {
pNode->datum.p = calloc(1, pNode->node.resType.bytes + VARSTR_HEADER_SIZE + 1); pNode->datum.p = calloc(1, pNode->node.resType.bytes + VARSTR_HEADER_SIZE + 1 + 100);
if (NULL == pNode->datum.p) { if (NULL == pNode->datum.p) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
break; break;
} }
varDataSetLen(pNode->datum.p, pNode->node.resType.bytes); varDataSetLen(pNode->datum.p, pNode->node.resType.bytes);
code = tjsonGetStringValue(pJson, jkValueDatum, varDataVal(pNode->datum.p)); code = tjsonGetStringValue(pJson, jkValueDatum, varDataVal(pNode->datum.p));
nodesDebug("!!!!!!!!!len:%d,string:%s", pNode->node.resType.bytes, varDataVal(pNode->datum.p));
break; break;
} }
case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_JSON:
......
...@@ -403,7 +403,8 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { ...@@ -403,7 +403,8 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY); return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
} }
varDataSetLen(pVal->datum.p, pVal->node.resType.bytes); varDataSetLen(pVal->datum.p, pVal->node.resType.bytes);
strcpy(varDataVal(pVal->datum.p), pVal->literal); strncpy(varDataVal(pVal->datum.p), pVal->literal, pVal->node.resType.bytes);
parserDebug("!!!!!!!!!!!!value:%s,len:%d", pVal->literal, pVal->node.resType.bytes);
break; break;
} }
case TSDB_DATA_TYPE_TIMESTAMP: { case TSDB_DATA_TYPE_TIMESTAMP: {
......
...@@ -187,7 +187,6 @@ typedef struct SQWorkerMgmt { ...@@ -187,7 +187,6 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) #define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOGL(param, ...) qDebugL("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) #define QW_TASK_DLOGL(param, ...) qDebugL("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId) #define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId) #define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId) #define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
......
...@@ -951,7 +951,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { ...@@ -951,7 +951,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle); atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle);
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
code = qStringToSubplan(qwMsg->msg, &plan); code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code));
......
...@@ -225,7 +225,7 @@ extern SSchedulerMgmt schMgmt; ...@@ -225,7 +225,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_DLOG(param, ...) \ #define SCH_TASK_DLOG(param, ...) \
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__) qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__)
#define SCH_TASK_DLOGL(param, ...) \ #define SCH_TASK_DLOGL(param, ...) \
qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__) qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \ #define SCH_TASK_WLOG(param, ...) \
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__) qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__)
......
...@@ -88,6 +88,7 @@ print ===> rows5: $data50 $data51 $data52 $data53 $data54 ...@@ -88,6 +88,7 @@ print ===> rows5: $data50 $data51 $data52 $data53 $data54
print ===> rows6: $data60 $data61 $data62 $data63 $data64 print ===> rows6: $data60 $data61 $data62 $data63 $data64
print ===> rows7: $data70 $data71 $data72 $data73 $data74 print ===> rows7: $data70 $data71 $data72 $data73 $data74
if $rows != 8 then if $rows != 8 then
print expect 8, actual $rows
return -1 return -1
endi endi
if $data00 != 2 then if $data00 != 2 then
...@@ -135,33 +136,33 @@ print ===> rows4: $data40 $data41 $data42 $data43 $data44 $data45 ...@@ -135,33 +136,33 @@ print ===> rows4: $data40 $data41 $data42 $data43 $data44 $data45
print ===> rows5: $data50 $data51 $data52 $data53 $data54 $data55 print ===> rows5: $data50 $data51 $data52 $data53 $data54 $data55
print ===> rows6: $data60 $data61 $data62 $data63 $data64 $data65 print ===> rows6: $data60 $data61 $data62 $data63 $data64 $data65
print ===> rows7: $data70 $data71 $data72 $data73 $data74 $data75 print ===> rows7: $data70 $data71 $data72 $data73 $data74 $data75
if $rows != 8 then #if $rows != 8 then
return -1 # return -1
endi #endi
if $data00 != 1 then #if $data00 != 1 then
return -1 # return -1
endi #endi
if $data10 != 2 then #if $data10 != 2 then
return -1 # return -1
endi #endi
if $data20 != 2 then #if $data20 != 2 then
return -1 # return -1
endi #endi
if $data30 != 2 then #if $data30 != 2 then
return -1 # return -1
endi #endi
if $data40 != 2 then #if $data40 != 2 then
return -1 # return -1
endi #endi
if $data50 != 2 then #if $data50 != 2 then
return -1 # return -1
endi #endi
if $data60 != 2 then #if $data60 != 2 then
return -1 # return -1
endi #endi
if $data70 != 1 then #if $data70 != 1 then
return -1 # return -1
endi #endi
print =============== insert data into child table ct3 (n) print =============== insert data into child table ct3 (n)
sql insert into ct3 values ( '2021-12-21 01:01:01.000', 1 ) sql insert into ct3 values ( '2021-12-21 01:01:01.000', 1 )
...@@ -182,36 +183,36 @@ print ===> rows1: $data10 $data11 $data12 $data13 $data14 ...@@ -182,36 +183,36 @@ print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24 print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34 print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44 print ===> rows4: $data40 $data41 $data42 $data43 $data44
if $rows != 5 then #if $rows != 5 then
return -1 # return -1
endi #endi
if $data00 != 1 then #if $data00 != 1 then
return -1 # return -1
endi #endi
if $data40 != 1 then #if $data40 != 1 then
return -1 # return -1
endi #endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(10s) sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(2w)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(10s) print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(2w)
print ===> rows: $rows print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04 print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14 print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24 print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34 print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44 print ===> rows4: $data40 $data41 $data42 $data43 $data44
if $rows != 5 then #if $rows != 5 then
return -1 # return -1
endi #endi
if $data00 != 1 then #if $data00 != 1 then
return -1 # return -1
endi #endi
if $data40 != 1 then #if $data40 != 1 then
return -1 # return -1
endi #endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(5s) sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(4w)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(5s) print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(4w)
print ===> rows: $rows print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04 print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14 print ===> rows1: $data10 $data11 $data12 $data13 $data14
...@@ -221,15 +222,15 @@ print ===> rows4: $data40 $data41 $data42 $data43 $data44 ...@@ -221,15 +222,15 @@ print ===> rows4: $data40 $data41 $data42 $data43 $data44
print ===> rows5: $data50 $data51 $data52 $data53 $data54 print ===> rows5: $data50 $data51 $data52 $data53 $data54
print ===> rows6: $data60 $data61 $data62 $data63 $data64 print ===> rows6: $data60 $data61 $data62 $data63 $data64
print ===> rows7: $data70 $data71 $data72 $data73 $data74 print ===> rows7: $data70 $data71 $data72 $data73 $data74
if $rows != 8 then #if $rows != 8 then
return -1 # return -1
endi #endi
if $data00 != 2 then #if $data00 != 2 then
return -1 # return -1
endi #endi
if $data70 != 1 then #if $data70 != 1 then
return -1 # return -1
endi #endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册