提交 11657bd1 编写于 作者: H Haojun Liao

Merge branch 'feature/3_liaohj' of github.com:taosdata/tdengine into feature/3_liaohj

...@@ -113,6 +113,7 @@ int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * ...@@ -113,6 +113,7 @@ int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *
int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -332,7 +332,6 @@ int32_t* taosGetErrno(); ...@@ -332,7 +332,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519) #define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519)
#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a) #define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a)
#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b) #define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b)
#define TSDB_CODE_VND_READ_END TAOS_DEF_ERROR_CODE(0, 0x051c)
// tsdb // tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
......
...@@ -138,7 +138,7 @@ void *tsdbGetIdx(SMeta *pMeta); ...@@ -138,7 +138,7 @@ void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta);
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray* pTableUids); int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
int32_t tsdbLastrowReaderClose(void *pReader); int32_t tsdbLastrowReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
......
...@@ -24,12 +24,12 @@ extern "C" { ...@@ -24,12 +24,12 @@ extern "C" {
// tsdbDebug ================ // tsdbDebug ================
// clang-format off // clang-format off
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSD FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) #define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSD ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) #define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSD WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) #define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSD ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) #define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSD ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on // clang-format on
typedef struct TSDBROW TSDBROW; typedef struct TSDBROW TSDBROW;
...@@ -115,7 +115,6 @@ int32_t tGetBlock(uint8_t *p, void *ph); ...@@ -115,7 +115,6 @@ int32_t tGetBlock(uint8_t *p, void *ph);
int32_t tBlockCmprFn(const void *p1, const void *p2); int32_t tBlockCmprFn(const void *p1, const void *p2);
bool tBlockHasSma(SBlock *pBlock); bool tBlockHasSma(SBlock *pBlock);
// SBlockIdx // SBlockIdx
void tBlockIdxReset(SBlockIdx *pBlockIdx);
int32_t tPutBlockIdx(uint8_t *p, void *ph); int32_t tPutBlockIdx(uint8_t *p, void *ph);
int32_t tGetBlockIdx(uint8_t *p, void *ph); int32_t tGetBlockIdx(uint8_t *p, void *ph);
int32_t tCmprBlockIdx(void const *lhs, void const *rhs); int32_t tCmprBlockIdx(void const *lhs, void const *rhs);
...@@ -126,6 +125,8 @@ void tColDataClear(void *ph); ...@@ -126,6 +125,8 @@ void tColDataClear(void *ph);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal); int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal);
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest); int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
int32_t tPutColData(uint8_t *p, SColData *pColData);
int32_t tGetColData(uint8_t *p, SColData *pColData);
// SBlockData // SBlockData
#define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0) #define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0)
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1) #define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
...@@ -134,14 +135,17 @@ int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest); ...@@ -134,14 +135,17 @@ int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
int32_t tBlockDataInit(SBlockData *pBlockData); int32_t tBlockDataInit(SBlockData *pBlockData);
void tBlockDataReset(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema); int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema);
int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom);
void tBlockDataClearData(SBlockData *pBlockData); void tBlockDataClearData(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx); SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx);
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData);
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData);
// SDelIdx // SDelIdx
int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tPutDelIdx(uint8_t *p, void *ph);
int32_t tGetDelIdx(uint8_t *p, void *ph); int32_t tGetDelIdx(uint8_t *p, void *ph);
...@@ -202,7 +206,7 @@ int32_t tsdbFSStateUpsertDelFile(STsdbFSState *pState, SDelFile *pDelFile); ...@@ -202,7 +206,7 @@ int32_t tsdbFSStateUpsertDelFile(STsdbFSState *pState, SDelFile *pDelFile);
int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet); int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet);
void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid); void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid);
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState); SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState);
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid); SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid, int32_t flag);
// tsdbReaderWriter.c ============================================================================================== // tsdbReaderWriter.c ==============================================================================================
// SDataFWriter // SDataFWriter
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
...@@ -357,10 +361,6 @@ struct TSDBROW { ...@@ -357,10 +361,6 @@ struct TSDBROW {
struct SBlockIdx { struct SBlockIdx {
int64_t suid; int64_t suid;
int64_t uid; int64_t uid;
TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int64_t offset; int64_t offset;
int64_t size; int64_t size;
}; };
......
...@@ -309,6 +309,7 @@ void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); ...@@ -309,6 +309,7 @@ void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
struct SSnapDataHdr { struct SSnapDataHdr {
int8_t type; int8_t type;
int64_t index;
int64_t size; int64_t size;
uint8_t data[]; uint8_t data[];
}; };
......
...@@ -26,60 +26,72 @@ struct SMetaSnapReader { ...@@ -26,60 +26,72 @@ struct SMetaSnapReader {
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) { int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
int32_t code = 0; int32_t code = 0;
int32_t c = 0; int32_t c = 0;
SMetaSnapReader* pMetaSnapReader = NULL; SMetaSnapReader* pReader = NULL;
// alloc // alloc
pMetaSnapReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pMetaSnapReader)); pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
if (pMetaSnapReader == NULL) { if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
pMetaSnapReader->pMeta = pMeta; pReader->pMeta = pMeta;
pMetaSnapReader->sver = sver; pReader->sver = sver;
pMetaSnapReader->ever = ever; pReader->ever = ever;
// impl // impl
code = tdbTbcOpen(pMeta->pTbDb, &pMetaSnapReader->pTbc, NULL); code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
if (code) { if (code) {
taosMemoryFree(pReader);
goto _err; goto _err;
} }
code = tdbTbcMoveTo(pMetaSnapReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c); code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
if (code) { if (code) {
taosMemoryFree(pReader);
goto _err; goto _err;
} }
*ppReader = pMetaSnapReader; metaInfo("vgId:%d vnode snapshot meta reader opened", TD_VID(pMeta->pVnode));
*ppReader = pReader;
return code; return code;
_err: _err:
metaError("vgId:%d meta snap reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code)); metaError("vgId:%d vnode snapshot meta reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) { int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) {
int32_t code = 0;
tdbTbcClose((*ppReader)->pTbc); tdbTbcClose((*ppReader)->pTbc);
taosMemoryFree(*ppReader); taosMemoryFree(*ppReader);
*ppReader = NULL; *ppReader = NULL;
return 0;
return code;
} }
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) { int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
const void* pKey = NULL; const void* pKey = NULL;
const void* pData = NULL; const void* pData = NULL;
int32_t nKey = 0; int32_t nKey = 0;
int32_t nData = 0; int32_t nData = 0;
int32_t code = 0; STbDbKey key;
*ppData = NULL;
for (;;) { for (;;) {
code = tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData); if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) {
if (code || ((STbDbKey*)pData)->version > pReader->ever) {
code = TSDB_CODE_VND_READ_END;
goto _exit; goto _exit;
} }
if (((STbDbKey*)pData)->version < pReader->sver) { key = ((STbDbKey*)pKey)[0];
if (key.version > pReader->ever) {
goto _exit;
}
if (key.version < pReader->sver) {
tdbTbcMoveToNext(pReader->pTbc); tdbTbcMoveToNext(pReader->pTbc);
continue; continue;
} }
...@@ -88,17 +100,28 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) { ...@@ -88,17 +100,28 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
break; break;
} }
// copy the data ASSERT(pData && nData);
if (tRealloc(ppData, sizeof(SSnapDataHdr) + nData) < 0) {
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
return code; goto _err;
} }
((SSnapDataHdr*)(*ppData))->type = 0; // TODO: use macro
((SSnapDataHdr*)(*ppData))->size = nData; SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
memcpy(((SSnapDataHdr*)(*ppData))->data, pData, nData); pHdr->type = 0; // TODO: use macro
pHdr->size = nData;
memcpy(pHdr->data, pData, nData);
metaInfo("vgId:%d vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " nData:%d",
TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
_exit: _exit:
return code; return code;
_err:
metaError("vgId:%d vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode), tstrerror(code));
return code;
} }
// SMetaSnapWriter ======================================== // SMetaSnapWriter ========================================
...@@ -108,18 +131,6 @@ struct SMetaSnapWriter { ...@@ -108,18 +131,6 @@ struct SMetaSnapWriter {
int64_t ever; int64_t ever;
}; };
static int32_t metaSnapRollback(SMetaSnapWriter* pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t metaSnapCommit(SMetaSnapWriter* pWriter) {
int32_t code = 0;
// TODO
return code;
}
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) { int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
int32_t code = 0; int32_t code = 0;
SMetaSnapWriter* pWriter; SMetaSnapWriter* pWriter;
...@@ -148,10 +159,9 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) { ...@@ -148,10 +159,9 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
SMetaSnapWriter* pWriter = *ppWriter; SMetaSnapWriter* pWriter = *ppWriter;
if (rollback) { if (rollback) {
code = metaSnapRollback(pWriter); ASSERT(0);
if (code) goto _err;
} else { } else {
code = metaSnapCommit(pWriter); code = metaCommit(pWriter->pMeta);
if (code) goto _err; if (code) goto _err;
} }
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
...@@ -170,15 +180,16 @@ int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) ...@@ -170,15 +180,16 @@ int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
SMetaEntry metaEntry = {0}; SMetaEntry metaEntry = {0};
SDecoder* pDecoder = &(SDecoder){0}; SDecoder* pDecoder = &(SDecoder){0};
tDecoderInit(pDecoder, pData, nData); tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
metaDecodeEntry(pDecoder, &metaEntry); metaDecodeEntry(pDecoder, &metaEntry);
code = metaHandleEntry(pMeta, &metaEntry); code = metaHandleEntry(pMeta, &metaEntry);
if (code) goto _err; if (code) goto _err;
tDecoderClear(pDecoder);
return code; return code;
_err: _err:
metaError("vgId:%d meta snapshot write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code)); metaError("vgId:%d vnode snapshot meta write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
return code; return code;
} }
\ No newline at end of file
...@@ -476,9 +476,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -476,9 +476,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (--state->iFileSet >= 0) { if (--state->iFileSet >= 0) {
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
} else { } else {
// tBlockDataClear(&state->blockData); // tBlockDataClear(&state->blockData, 1);
if (state->pBlockData) { if (state->pBlockData) {
tBlockDataClear(state->pBlockData); tBlockDataClear(state->pBlockData, 1);
state->pBlockData = NULL; state->pBlockData = NULL;
} }
...@@ -500,9 +500,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -500,9 +500,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (code) goto _err; if (code) goto _err;
/* if (state->pBlockIdx) { */ /* if (state->pBlockIdx) { */
/* tBlockIdxReset(state->blockIdx); */
/* } */ /* } */
/* tBlockIdxReset(state->blockIdx); */
/* code = tMapDataSearch(&state->blockIdxMap, state->pBlockIdxExp, tGetBlockIdx, tCmprBlockIdx, /* code = tMapDataSearch(&state->blockIdxMap, state->pBlockIdxExp, tGetBlockIdx, tCmprBlockIdx,
* &state->blockIdx); * &state->blockIdx);
*/ */
...@@ -582,8 +580,8 @@ _err: ...@@ -582,8 +580,8 @@ _err:
state->aBlockIdx = NULL; state->aBlockIdx = NULL;
} }
if (state->pBlockData) { if (state->pBlockData) {
// tBlockDataClear(&state->blockData); // tBlockDataClear(&state->blockData, 1);
tBlockDataClear(state->pBlockData); tBlockDataClear(state->pBlockData, 1);
state->pBlockData = NULL; state->pBlockData = NULL;
} }
...@@ -609,8 +607,8 @@ int32_t clearNextRowFromFS(void *iter) { ...@@ -609,8 +607,8 @@ int32_t clearNextRowFromFS(void *iter) {
state->aBlockIdx = NULL; state->aBlockIdx = NULL;
} }
if (state->pBlockData) { if (state->pBlockData) {
// tBlockDataClear(&state->blockData); // tBlockDataClear(&state->blockData, 1);
tBlockDataClear(state->pBlockData); tBlockDataClear(state->pBlockData, 1);
state->pBlockData = NULL; state->pBlockData = NULL;
} }
......
...@@ -263,7 +263,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -263,7 +263,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
taosArrayClear(pCommitter->aBlockIdx); taosArrayClear(pCommitter->aBlockIdx);
tMapDataReset(&pCommitter->oBlockMap); tMapDataReset(&pCommitter->oBlockMap);
tBlockDataReset(&pCommitter->oBlockData); tBlockDataReset(&pCommitter->oBlockData);
pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid); pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid, TD_EQ);
if (pRSet) { if (pRSet) {
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet); code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
if (code) goto _err; if (code) goto _err;
...@@ -284,16 +284,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -284,16 +284,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
.fLast = {.commitID = pCommitter->commitID, .size = 0}, .fLast = {.commitID = pCommitter->commitID, .size = 0},
.fSma = pRSet->fSma}; .fSma = pRSet->fSma};
} else { } else {
STfs *pTfs = pTsdb->pVnode->pTfs; wSet = (SDFileSet){.diskId = (SDiskID){.level = 0, .id = 0},
SDiskID did = {.level = 0, .id = 0};
// TODO: alloc a new disk
// tfsAllocDisk(pTfs, 0, &did);
// create the directory
tfsMkdirRecurAt(pTfs, pTsdb->path, did);
wSet = (SDFileSet){.diskId = did,
.fid = pCommitter->commitFid, .fid = pCommitter->commitFid,
.fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0}, .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
.fData = {.commitID = pCommitter->commitID, .size = 0}, .fData = {.commitID = pCommitter->commitID, .size = 0},
...@@ -1001,10 +992,10 @@ _exit: ...@@ -1001,10 +992,10 @@ _exit:
static void tsdbCommitDataEnd(SCommitter *pCommitter) { static void tsdbCommitDataEnd(SCommitter *pCommitter) {
taosArrayDestroy(pCommitter->aBlockIdx); taosArrayDestroy(pCommitter->aBlockIdx);
tMapDataClear(&pCommitter->oBlockMap); tMapDataClear(&pCommitter->oBlockMap);
tBlockDataClear(&pCommitter->oBlockData); tBlockDataClear(&pCommitter->oBlockData, 1);
taosArrayDestroy(pCommitter->aBlockIdxN); taosArrayDestroy(pCommitter->aBlockIdxN);
tMapDataClear(&pCommitter->nBlockMap); tMapDataClear(&pCommitter->nBlockMap);
tBlockDataClear(&pCommitter->nBlockData); tBlockDataClear(&pCommitter->nBlockData, 1);
tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pCommitter->skmTable.pTSchema);
tTSchemaDestroy(pCommitter->skmRow.pTSchema); tTSchemaDestroy(pCommitter->skmRow.pTSchema);
} }
......
...@@ -698,6 +698,6 @@ void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid) { ...@@ -698,6 +698,6 @@ void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid) {
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile; } SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile; }
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid) { SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid, int32_t flag) {
return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, flag);
} }
...@@ -63,10 +63,10 @@ typedef struct SBlockLoadSuppInfo { ...@@ -63,10 +63,10 @@ typedef struct SBlockLoadSuppInfo {
} SBlockLoadSuppInfo; } SBlockLoadSuppInfo;
typedef struct SFilesetIter { typedef struct SFilesetIter {
int32_t numOfFiles; // number of total files int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list int32_t index; // current accessed index in the list
SArray* pFileList; // data file list SArray* pFileList; // data file list
int32_t order; int32_t order;
} SFilesetIter; } SFilesetIter;
typedef struct SFileDataBlockInfo { typedef struct SFileDataBlockInfo {
...@@ -830,8 +830,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI ...@@ -830,8 +830,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols,
pBlockData, NULL, NULL); pBlockData, NULL, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -1991,7 +1991,7 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead ...@@ -1991,7 +1991,7 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead
static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) { static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx)); SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx));
while (1) { while (1) {
bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader); bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
...@@ -3006,14 +3006,14 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { ...@@ -3006,14 +3006,14 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tBlockDataClear(&pStatus->fileBlockData); tBlockDataClear(&pStatus->fileBlockData, 1);
terrno = code; terrno = code;
return NULL; return NULL;
} }
copyBlockDataToSDataBlock(pReader, pBlockScanInfo); copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
tBlockDataClear(&pStatus->fileBlockData); tBlockDataClear(&pStatus->fileBlockData, 1);
return pReader->pResBlock->pDataBlock; return pReader->pResBlock->pDataBlock;
} }
...@@ -3132,8 +3132,8 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa ...@@ -3132,8 +3132,8 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
hasNext = (pBlockIter->numOfBlocks > 0); hasNext = (pBlockIter->numOfBlocks > 0);
} }
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables, // tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr); // pReader->pFileGroup->fid, pReader->idStr);
} }
return code; return code;
...@@ -3204,4 +3204,3 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 ...@@ -3204,4 +3204,3 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -979,21 +979,21 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl ...@@ -979,21 +979,21 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
code = tBlockDataCopy(pBlockData, pBlockData2); code = tBlockDataCopy(pBlockData, pBlockData2);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
} }
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
} }
tFree(pBuf1); tFree(pBuf1);
...@@ -1115,29 +1115,29 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p ...@@ -1115,29 +1115,29 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p
for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2); code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
code = tBlockDataCopy(pBlockData, pBlockData2); code = tBlockDataCopy(pBlockData, pBlockData2);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
// merge two block data // merge two block data
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
} }
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
} }
ASSERT(pBlock->nRow == pBlockData->nRow); ASSERT(pBlock->nRow == pBlockData->nRow);
......
...@@ -36,15 +36,15 @@ int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(u ...@@ -36,15 +36,15 @@ int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(u
// alloc // alloc
code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem); code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem);
if (code) goto _err; if (code) goto _exit;
code = tRealloc(&pMapData->pData, pMapData->nData); code = tRealloc(&pMapData->pData, pMapData->nData);
if (code) goto _err; if (code) goto _exit;
// put // put
pMapData->aOffset[nItem] = offset; pMapData->aOffset[nItem] = offset;
tPutItemFn(pMapData->pData + offset, pItem); tPutItemFn(pMapData->pData + offset, pItem);
_err: _exit:
return code; return code;
} }
...@@ -189,25 +189,12 @@ static FORCE_INLINE int32_t tGetTSDBKEY(uint8_t *p, TSDBKEY *pKey) { ...@@ -189,25 +189,12 @@ static FORCE_INLINE int32_t tGetTSDBKEY(uint8_t *p, TSDBKEY *pKey) {
} }
// SBlockIdx ====================================================== // SBlockIdx ======================================================
void tBlockIdxReset(SBlockIdx *pBlockIdx) {
pBlockIdx->minKey = TSKEY_MAX;
pBlockIdx->maxKey = TSKEY_MIN;
pBlockIdx->minVersion = VERSION_MAX;
pBlockIdx->maxVersion = VERSION_MIN;
pBlockIdx->offset = -1;
pBlockIdx->size = -1;
}
int32_t tPutBlockIdx(uint8_t *p, void *ph) { int32_t tPutBlockIdx(uint8_t *p, void *ph) {
int32_t n = 0; int32_t n = 0;
SBlockIdx *pBlockIdx = (SBlockIdx *)ph; SBlockIdx *pBlockIdx = (SBlockIdx *)ph;
n += tPutI64(p ? p + n : p, pBlockIdx->suid); n += tPutI64(p ? p + n : p, pBlockIdx->suid);
n += tPutI64(p ? p + n : p, pBlockIdx->uid); n += tPutI64(p ? p + n : p, pBlockIdx->uid);
n += tPutI64(p ? p + n : p, pBlockIdx->minKey);
n += tPutI64(p ? p + n : p, pBlockIdx->maxKey);
n += tPutI64v(p ? p + n : p, pBlockIdx->minVersion);
n += tPutI64v(p ? p + n : p, pBlockIdx->maxVersion);
n += tPutI64v(p ? p + n : p, pBlockIdx->offset); n += tPutI64v(p ? p + n : p, pBlockIdx->offset);
n += tPutI64v(p ? p + n : p, pBlockIdx->size); n += tPutI64v(p ? p + n : p, pBlockIdx->size);
...@@ -220,10 +207,6 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) { ...@@ -220,10 +207,6 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) {
n += tGetI64(p + n, &pBlockIdx->suid); n += tGetI64(p + n, &pBlockIdx->suid);
n += tGetI64(p + n, &pBlockIdx->uid); n += tGetI64(p + n, &pBlockIdx->uid);
n += tGetI64(p + n, &pBlockIdx->minKey);
n += tGetI64(p + n, &pBlockIdx->maxKey);
n += tGetI64v(p + n, &pBlockIdx->minVersion);
n += tGetI64v(p + n, &pBlockIdx->maxVersion);
n += tGetI64v(p + n, &pBlockIdx->offset); n += tGetI64v(p + n, &pBlockIdx->offset);
n += tGetI64v(p + n, &pBlockIdx->size); n += tGetI64v(p + n, &pBlockIdx->size);
...@@ -921,6 +904,76 @@ _exit: ...@@ -921,6 +904,76 @@ _exit:
return code; return code;
} }
int32_t tPutColData(uint8_t *p, SColData *pColData) {
int32_t n = 0;
n += tPutI16v(p ? p + n : p, pColData->cid);
n += tPutI8(p ? p + n : p, pColData->type);
n += tPutI8(p ? p + n : p, pColData->smaOn);
n += tPutI32v(p ? p + n : p, pColData->nVal);
n += tPutU8(p ? p + n : p, pColData->flag);
if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) goto _exit;
if (pColData->flag != HAS_VALUE) {
// bitmap
int32_t size = BIT2_SIZE(pColData->nVal);
if (p) {
memcpy(p + n, pColData->pBitMap, size);
}
n += size;
}
if (IS_VAR_DATA_TYPE(pColData->type)) {
// offset
int32_t size = sizeof(int32_t) * pColData->nVal;
if (p) {
memcpy(p + n, pColData->aOffset, size);
}
n += size;
}
n += tPutI32v(p ? p + n : p, pColData->nData);
if (p) {
memcpy(p + n, pColData->pData, pColData->nData);
}
n += pColData->nData;
_exit:
return n;
}
int32_t tGetColData(uint8_t *p, SColData *pColData) {
int32_t n = 0;
n += tGetI16v(p + n, &pColData->cid);
n += tGetI8(p + n, &pColData->type);
n += tGetI8(p + n, &pColData->smaOn);
n += tGetI32v(p + n, &pColData->nVal);
n += tGetU8(p + n, &pColData->flag);
if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) goto _exit;
if (pColData->flag != HAS_VALUE) {
// bitmap
int32_t size = BIT2_SIZE(pColData->nVal);
pColData->pBitMap = p + n;
n += size;
}
if (IS_VAR_DATA_TYPE(pColData->type)) {
// offset
int32_t size = sizeof(int32_t) * pColData->nVal;
pColData->aOffset = (int32_t *)(p + n);
n += size;
}
n += tGetI32v(p + n, &pColData->nData);
pColData->pData = p + n;
n += pColData->nData;
_exit:
return n;
}
static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) {
SColData *pColData1 = (SColData *)p1; SColData *pColData1 = (SColData *)p1;
SColData *pColData2 = (SColData *)p2; SColData *pColData2 = (SColData *)p2;
...@@ -962,11 +1015,11 @@ void tBlockDataReset(SBlockData *pBlockData) { ...@@ -962,11 +1015,11 @@ void tBlockDataReset(SBlockData *pBlockData) {
taosArrayClear(pBlockData->aIdx); taosArrayClear(pBlockData->aIdx);
} }
void tBlockDataClear(SBlockData *pBlockData) { void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) {
tFree((uint8_t *)pBlockData->aVersion); tFree((uint8_t *)pBlockData->aVersion);
tFree((uint8_t *)pBlockData->aTSKEY); tFree((uint8_t *)pBlockData->aTSKEY);
taosArrayDestroy(pBlockData->aIdx); taosArrayDestroy(pBlockData->aIdx);
taosArrayDestroyEx(pBlockData->aColData, tColDataClear); taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataClear : NULL);
} }
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) { int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) {
...@@ -1079,6 +1132,46 @@ _err: ...@@ -1079,6 +1132,46 @@ _err:
return code; return code;
} }
int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom) {
int32_t code = 0;
int32_t iColData = 0;
for (int32_t iColDataFrom = 0; iColDataFrom < taosArrayGetSize(pBlockDataFrom->aIdx); iColDataFrom++) {
SColData *pColDataFrom = tBlockDataGetColDataByIdx(pBlockDataFrom, iColDataFrom);
while (true) {
SColData *pColData;
if (iColData < taosArrayGetSize(pBlockData->aIdx)) {
pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
} else {
pColData = NULL;
}
if (pColData == NULL || pColData->cid > pColDataFrom->cid) {
code = tBlockDataAddColData(pBlockData, iColData, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pColDataFrom->cid, pColDataFrom->type, pColDataFrom->smaOn);
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _exit;
}
iColData++;
break;
} else if (pColData->cid == pColDataFrom->cid) {
iColData++;
break;
} else {
iColData++;
}
}
}
_exit:
return code;
}
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData) { int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData) {
int32_t code = 0; int32_t code = 0;
...@@ -1239,6 +1332,52 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD ...@@ -1239,6 +1332,52 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD
*ppColData = NULL; *ppColData = NULL;
} }
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData) {
int32_t n = 0;
n += tPutI32v(p ? p + n : p, pBlockData->nRow);
if (p) {
memcpy(p + n, pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow);
}
n = n + sizeof(int64_t) * pBlockData->nRow;
if (p) {
memcpy(p + n, pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow);
}
n = n + sizeof(TSKEY) * pBlockData->nRow;
int32_t nCol = taosArrayGetSize(pBlockData->aIdx);
n += tPutI32v(p ? p + n : p, nCol);
for (int32_t iCol = 0; iCol < nCol; iCol++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iCol);
n += tPutColData(p ? p + n : p, pColData);
}
return n;
}
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData) {
int32_t n = 0;
tBlockDataReset(pBlockData);
n += tGetI32v(p + n, &pBlockData->nRow);
pBlockData->aVersion = (int64_t *)(p + n);
n = n + sizeof(int64_t) * pBlockData->nRow;
pBlockData->aTSKEY = (TSKEY *)(p + n);
n = n + sizeof(TSKEY) * pBlockData->nRow;
int32_t nCol;
n += tGetI32v(p + n, &nCol);
for (int32_t iCol = 0; iCol < nCol; iCol++) {
SColData *pColData;
if (tBlockDataAddColData(pBlockData, iCol, &pColData)) return -1;
n += tGetColData(p + n, pColData);
}
return n;
}
// ALGORITHM ============================== // ALGORITHM ==============================
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
SColVal colVal; SColVal colVal;
......
...@@ -20,13 +20,13 @@ struct SVSnapReader { ...@@ -20,13 +20,13 @@ struct SVSnapReader {
SVnode *pVnode; SVnode *pVnode;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
int64_t index;
// meta // meta
int8_t metaDone; int8_t metaDone;
SMetaSnapReader *pMetaReader; SMetaSnapReader *pMetaReader;
// tsdb // tsdb
int8_t tsdbDone; int8_t tsdbDone;
STsdbSnapReader *pTsdbReader; STsdbSnapReader *pTsdbReader;
uint8_t *pData;
}; };
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) { int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
...@@ -42,12 +42,7 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapRe ...@@ -42,12 +42,7 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapRe
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
code = metaSnapReaderOpen(pVnode->pMeta, sver, ever, &pReader->pMetaReader); vInfo("vgId:%d vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
if (code) goto _err;
code = tsdbSnapReaderOpen(pVnode->pTsdb, sver, ever, &pReader->pTsdbReader);
if (code) goto _err;
*ppReader = pReader; *ppReader = pReader;
return code; return code;
...@@ -60,54 +55,85 @@ _err: ...@@ -60,54 +55,85 @@ _err:
int32_t vnodeSnapReaderClose(SVSnapReader *pReader) { int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t code = 0; int32_t code = 0;
tFree(pReader->pData); if (pReader->pTsdbReader) {
if (pReader->pTsdbReader) tsdbSnapReaderClose(&pReader->pTsdbReader); tsdbSnapReaderClose(&pReader->pTsdbReader);
if (pReader->pMetaReader) metaSnapReaderClose(&pReader->pMetaReader); }
taosMemoryFree(pReader);
if (pReader->pMetaReader) {
metaSnapReaderClose(&pReader->pMetaReader);
}
vInfo("vgId:%d vnode snapshot reader closed", TD_VID(pReader->pVnode));
taosMemoryFree(pReader);
return code; return code;
} }
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) { int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
int32_t code = 0; int32_t code = 0;
// META ==============
if (!pReader->metaDone) { if (!pReader->metaDone) {
code = metaSnapRead(pReader->pMetaReader, &pReader->pData); // open reader if not
if (pReader->pMetaReader == NULL) {
code = metaSnapReaderOpen(pReader->pVnode->pMeta, pReader->sver, pReader->ever, &pReader->pMetaReader);
if (code) goto _err;
}
code = metaSnapRead(pReader->pMetaReader, ppData);
if (code) { if (code) {
if (code == TSDB_CODE_VND_READ_END) { goto _err;
pReader->metaDone = 1; } else {
if (*ppData) {
goto _exit;
} else { } else {
goto _err; pReader->metaDone = 1;
code = metaSnapReaderClose(&pReader->pMetaReader);
if (code) goto _err;
} }
} else {
*ppData = pReader->pData;
*nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size;
goto _exit;
} }
} }
// TSDB ==============
if (!pReader->tsdbDone) { if (!pReader->tsdbDone) {
code = tsdbSnapRead(pReader->pTsdbReader, &pReader->pData); // open if not
if (pReader->pTsdbReader == NULL) {
code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, &pReader->pTsdbReader);
if (code) goto _err;
}
code = tsdbSnapRead(pReader->pTsdbReader, ppData);
if (code) { if (code) {
if (code == TSDB_CODE_VND_READ_END) { goto _err;
pReader->tsdbDone = 1; } else {
if (*ppData) {
goto _exit;
} else { } else {
goto _err; pReader->tsdbDone = 1;
code = tsdbSnapReaderClose(&pReader->pTsdbReader);
if (code) goto _err;
} }
} else {
*ppData = pReader->pData;
*nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size;
goto _exit;
} }
} }
code = TSDB_CODE_VND_READ_END; *ppData = NULL;
*nData = 0;
_exit: _exit:
if (*ppData) {
SSnapDataHdr *pHdr = (SSnapDataHdr *)(*ppData);
pReader->index++;
*nData = sizeof(SSnapDataHdr) + pHdr->size;
pHdr->index = pReader->index;
vInfo("vgId:%d vnode snapshot read data,index:%" PRId64 " type:%d nData:%d ", TD_VID(pReader->pVnode),
pReader->index, pHdr->type, *nData);
} else {
vInfo("vgId:%d vnode snapshot read data end, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index);
}
return code; return code;
_err: _err:
vError("vgId:% snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code)); vError("vgId:% vnode snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code));
return code; return code;
} }
...@@ -116,24 +142,13 @@ struct SVSnapWriter { ...@@ -116,24 +142,13 @@ struct SVSnapWriter {
SVnode *pVnode; SVnode *pVnode;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
int64_t index;
// meta // meta
SMetaSnapWriter *pMetaSnapWriter; SMetaSnapWriter *pMetaSnapWriter;
// tsdb // tsdb
STsdbSnapWriter *pTsdbSnapWriter; STsdbSnapWriter *pTsdbSnapWriter;
}; };
static int32_t vnodeSnapRollback(SVSnapWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t vnodeSnapCommit(SVSnapWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) { int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) {
int32_t code = 0; int32_t code = 0;
SVSnapWriter *pWriter = NULL; SVSnapWriter *pWriter = NULL;
...@@ -148,62 +163,78 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr ...@@ -148,62 +163,78 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
pWriter->sver = sver; pWriter->sver = sver;
pWriter->ever = ever; pWriter->ever = ever;
vInfo("vgId:%d vnode snapshot writer opened", TD_VID(pVnode));
*ppWriter = pWriter;
return code; return code;
_err: _err:
vError("vgId:%d vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code)); vError("vgId:%d vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code));
*ppWriter = NULL;
return code; return code;
} }
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) { int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
if (rollback) { if (pWriter->pMetaSnapWriter) {
code = vnodeSnapRollback(pWriter); code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback);
if (code) goto _err; if (code) goto _err;
} else { }
code = vnodeSnapCommit(pWriter);
if (pWriter->pTsdbSnapWriter) {
code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback);
if (code) goto _err; if (code) goto _err;
} }
_exit:
vInfo("vgId:%d vnode snapshot writer closed, rollback:%d", TD_VID(pWriter->pVnode), rollback);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
return code; return code;
_err: _err:
vError("vgId:%d vnode snapshow writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code)); vError("vgId:%d vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
return code; return code;
} }
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
SSnapDataHdr *pSnapDataHdr = (SSnapDataHdr *)pData; SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
SVnode *pVnode = pWriter->pVnode; SVnode *pVnode = pWriter->pVnode;
ASSERT(pSnapDataHdr->size + sizeof(SSnapDataHdr) == nData); ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData);
ASSERT(pHdr->index == pWriter->index + 1);
pWriter->index = pHdr->index;
if (pSnapDataHdr->type == 0) { vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index,
pHdr->type, nData);
if (pHdr->type == 0) {
// meta // meta
if (pWriter->pMetaSnapWriter == NULL) { if (pWriter->pMetaSnapWriter == NULL) {
code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter); code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
if (code) goto _err; if (code) goto _err;
} }
code = metaSnapWrite(pWriter->pMetaSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} else { } else {
// tsdb // tsdb
if (pWriter->pTsdbSnapWriter == NULL) { if (pWriter->pTsdbSnapWriter == NULL) {
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter); code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
if (code) goto _err; if (code) goto _err;
} }
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} }
_exit:
return code; return code;
_err: _err:
vError("vgId:%d vnode snapshot write failed since %s", TD_VID(pVnode), tstrerror(code)); vError("vgId:%d vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode),
tstrerror(code), pHdr->index, pHdr->type, nData);
return code; return code;
} }
\ No newline at end of file
...@@ -2408,6 +2408,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2408,6 +2408,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getDiffFuncEnv, .getEnvFunc = getDiffFuncEnv,
.initFunc = diffFunctionSetup, .initFunc = diffFunctionSetup,
.processFunc = diffFunction, .processFunc = diffFunction,
.sprocessFunc = diffScalarFunction,
.finalizeFunc = functionFinalize .finalizeFunc = functionFinalize
}, },
{ {
......
...@@ -2408,6 +2408,10 @@ int32_t irateScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam ...@@ -2408,6 +2408,10 @@ int32_t irateScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
return nonCalcScalarFunction(pInput, inputNum, pOutput); return nonCalcScalarFunction(pInput, inputNum, pOutput);
} }
int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return nonCalcScalarFunction(pInput, inputNum, pOutput);
}
int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return avgScalarFunction(pInput, inputNum, pOutput); return avgScalarFunction(pInput, inputNum, pOutput);
} }
......
...@@ -223,6 +223,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode); ...@@ -223,6 +223,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode);
// snapshot -------------- // snapshot --------------
bool syncNodeHasSnapshot(SSyncNode* pSyncNode); bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode); SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode);
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode); SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode);
......
...@@ -711,6 +711,9 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -711,6 +711,9 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
syncNodeEventLog(ths, logBuf); syncNodeEventLog(ths, logBuf);
} while (0); } while (0);
// maybe update commit index by snapshot
syncNodeMaybeUpdateCommitBySnapshot(ths);
// prepare response msg // prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
...@@ -718,7 +721,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -718,7 +721,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->pRaftStore->currentTerm;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = false; pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID; pReply->matchIndex = ths->commitIndex;
// msg event log // msg event log
do { do {
......
...@@ -147,6 +147,15 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync ...@@ -147,6 +147,15 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0; int32_t ret = 0;
// print log
do {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, term:%lu, match:%ld, success:%d", pMsg->term,
pMsg->matchIndex, pMsg->success);
syncNodeEventLog(ths, logBuf);
} while (0);
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped"); syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
...@@ -238,7 +247,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ...@@ -238,7 +247,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
SSnapshot oldSnapshot; SSnapshot oldSnapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot);
SyncTerm newSnapshotTerm = oldSnapshot.lastApplyTerm; SyncTerm newSnapshotTerm = oldSnapshot.lastApplyTerm;
syncNodeStartSnapshotOnce(ths, SYNC_INDEX_BEGIN, nextIndex, newSnapshotTerm, pMsg);
SyncIndex endIndex;
if (ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex + 1)) {
endIndex = nextIndex;
} else {
endIndex = oldSnapshot.lastApplyIndex;
}
syncNodeStartSnapshotOnce(ths, pMsg->matchIndex + 1, endIndex, newSnapshotTerm, pMsg);
// get sender // get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
...@@ -256,6 +272,11 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ...@@ -256,6 +272,11 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
} }
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
if (pMsg->matchIndex > oldMatchIndex) {
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
}
// event log, update next-index // event log, update next-index
do { do {
char host[64]; char host[64];
......
...@@ -1083,6 +1083,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -1083,6 +1083,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
return pSyncNode; return pSyncNode;
} }
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot;
int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
ASSERT(code == 0);
if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
pSyncNode->commitIndex = snapshot.lastApplyIndex;
}
}
}
void syncNodeStart(SSyncNode* pSyncNode) { void syncNodeStart(SSyncNode* pSyncNode) {
// start raft // start raft
if (pSyncNode->replicaNum == 1) { if (pSyncNode->replicaNum == 1) {
......
...@@ -336,8 +336,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists ...@@ -336,8 +336,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_READ_END, "Read end")
// tsdb // tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
......
...@@ -79,22 +79,39 @@ class TDSql: ...@@ -79,22 +79,39 @@ class TDSql:
self.queryResult = None self.queryResult = None
tdLog.info("sql:%s, expect error occured" % (sql)) tdLog.info("sql:%s, expect error occured" % (sql))
def query(self, sql, row_tag=None): def query(self, sql, row_tag=None,queyTimes=10):
self.sql = sql self.sql = sql
try: i=1
self.cursor.execute(sql) while i <= queyTimes:
self.queryResult = self.cursor.fetchall() try:
self.queryRows = len(self.queryResult) self.cursor.execute(sql)
self.queryCols = len(self.cursor.description) self.queryResult = self.cursor.fetchall()
except Exception as e: self.queryRows = len(self.queryResult)
caller = inspect.getframeinfo(inspect.stack()[1][0]) self.queryCols = len(self.cursor.description)
args = (caller.filename, caller.lineno, sql, repr(e)) if row_tag:
tdLog.notice("%s(%d) failed: sql:%s, %s" % args) return self.queryResult
traceback.print_exc() return self.queryRows
raise Exception(repr(e)) except Exception as e:
if row_tag: i+=1
return self.queryResult tdLog.notice("Try to query again, query times: %d "%i)
return self.queryRows pass
else:
try:
tdLog.notice("Try the last query ")
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self.cursor.description)
if row_tag:
return self.queryResult
return self.queryRows
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
traceback.print_exc()
raise Exception(repr(e))
def is_err_sql(self, sql): def is_err_sql(self, sql):
err_flag = True err_flag = True
...@@ -266,16 +283,27 @@ class TDSql: ...@@ -266,16 +283,27 @@ class TDSql:
time.sleep(1) time.sleep(1)
continue continue
def execute(self, sql): def execute(self, sql,queyTimes=10):
self.sql = sql self.sql = sql
try: i=1
self.affectedRows = self.cursor.execute(sql) while i <= queyTimes:
except Exception as e: try:
caller = inspect.getframeinfo(inspect.stack()[1][0]) self.affectedRows = self.cursor.execute(sql)
args = (caller.filename, caller.lineno, sql, repr(e)) return self.affectedRows
tdLog.notice("%s(%d) failed: sql:%s, %s" % args) except Exception as e:
raise Exception(repr(e)) i+=1
return self.affectedRows tdLog.notice("Try to execute sql again, query times: %d "%i)
pass
else:
try:
tdLog.notice("Try the last execute sql ")
self.affectedRows = self.cursor.execute(sql)
return self.affectedRows
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
def checkAffectedRows(self, expectAffectedRows): def checkAffectedRows(self, expectAffectedRows):
if self.affectedRows != expectAffectedRows: if self.affectedRows != expectAffectedRows:
......
...@@ -91,21 +91,21 @@ if $rows != $vgroups then ...@@ -91,21 +91,21 @@ if $rows != $vgroups then
return -1 return -1
endi endi
if $data[0][4] == LEADER then if $data[0][4] == leader then
if $data[0][6] == FOLLOWER then if $data[0][6] == follower then
if $data[0][8] == FOLLOWER then if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
endi endi
endi endi
elif $data[0][6] == LEADER then elif $data[0][6] == leader then
if $data[0][4] == FOLLOWER then if $data[0][4] == follower then
if $data[0][8] == FOLLOWER then if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
endi endi
endi endi
elif $data[0][8] == LEADER then elif $data[0][8] == leader then
if $data[0][4] == FOLLOWER then if $data[0][4] == follower then
if $data[0][6] == FOLLOWER then if $data[0][6] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
endi endi
endi endi
...@@ -113,21 +113,21 @@ else ...@@ -113,21 +113,21 @@ else
goto check_vg_ready goto check_vg_ready
endi endi
if $data[1][4] == LEADER then if $data[1][4] == leader then
if $data[1][6] == FOLLOWER then if $data[1][6] == follower then
if $data[1][8] == FOLLOWER then if $data[1][8] == follower then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][3] print ---- vgroup $data[1][0] leader locate on dnode $data[1][3]
endi endi
endi endi
elif $data[1][6] == LEADER then elif $data[1][6] == leader then
if $data[1][4] == FOLLOWER then if $data[1][4] == follower then
if $data[1][8] == FOLLOWER then if $data[1][8] == follower then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][5] print ---- vgroup $data[1][0] leader locate on dnode $data[1][5]
endi endi
endi endi
elif $data[1][8] == LEADER then elif $data[1][8] == leader then
if $data[1][4] == FOLLOWER then if $data[1][4] == follower then
if $data[1][6] == FOLLOWER then if $data[1][6] == follower then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][7] print ---- vgroup $data[1][0] leader locate on dnode $data[1][7]
endi endi
endi endi
...@@ -135,21 +135,21 @@ else ...@@ -135,21 +135,21 @@ else
goto check_vg_ready goto check_vg_ready
endi endi
if $data[2][4] == LEADER then if $data[2][4] == leader then
if $data[2][6] == FOLLOWER then if $data[2][6] == follower then
if $data[2][8] == FOLLOWER then if $data[2][8] == follower then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][3] print ---- vgroup $data[2][0] leader locate on dnode $data[2][3]
endi endi
endi endi
elif $data[2][6] == LEADER then elif $data[2][6] == leader then
if $data[2][4] == FOLLOWER then if $data[2][4] == follower then
if $data[2][8] == FOLLOWER then if $data[2][8] == follower then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][5] print ---- vgroup $data[2][0] leader locate on dnode $data[2][5]
endi endi
endi endi
elif $data[2][8] == LEADER then elif $data[2][8] == leader then
if $data[2][4] == FOLLOWER then if $data[2][4] == follower then
if $data[2][6] == FOLLOWER then if $data[2][6] == follower then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][7] print ---- vgroup $data[2][0] leader locate on dnode $data[2][7]
endi endi
endi endi
...@@ -157,21 +157,21 @@ else ...@@ -157,21 +157,21 @@ else
goto check_vg_ready goto check_vg_ready
endi endi
if $data[3][4] == LEADER then if $data[3][4] == leader then
if $data[3][6] == FOLLOWER then if $data[3][6] == follower then
if $data[3][8] == FOLLOWER then if $data[3][8] == follower then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][3] print ---- vgroup $data[3][0] leader locate on dnode $data[3][3]
endi endi
endi endi
elif $data[3][6] == LEADER then elif $data[3][6] == leader then
if $data[3][4] == FOLLOWER then if $data[3][4] == follower then
if $data[3][8] == FOLLOWER then if $data[3][8] == follower then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][5] print ---- vgroup $data[3][0] leader locate on dnode $data[3][5]
endi endi
endi endi
elif $data[3][8] == LEADER then elif $data[3][8] == leader then
if $data[3][4] == FOLLOWER then if $data[3][4] == follower then
if $data[3][6] == FOLLOWER then if $data[3][6] == follower then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][7] print ---- vgroup $data[3][0] leader locate on dnode $data[3][7]
endi endi
endi endi
...@@ -179,21 +179,21 @@ else ...@@ -179,21 +179,21 @@ else
goto check_vg_ready goto check_vg_ready
endi endi
if $data[4][4] == LEADER then if $data[4][4] == leader then
if $data[4][6] == FOLLOWER then if $data[4][6] == follower then
if $data[4][8] == FOLLOWER then if $data[4][8] == follower then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][3] print ---- vgroup $data[4][0] leader locate on dnode $data[4][3]
endi endi
endi endi
elif $data[4][6] == LEADER then elif $data[4][6] == leader then
if $data[4][4] == FOLLOWER then if $data[4][4] == follower then
if $data[4][8] == FOLLOWER then if $data[4][8] == follower then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][5] print ---- vgroup $data[4][0] leader locate on dnode $data[4][5]
endi endi
endi endi
elif $data[4][8] == LEADER then elif $data[4][8] == leader then
if $data[4][4] == FOLLOWER then if $data[4][4] == follower then
if $data[4][6] == FOLLOWER then if $data[4][6] == follower then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][7] print ---- vgroup $data[4][0] leader locate on dnode $data[4][7]
endi endi
endi endi
...@@ -286,13 +286,13 @@ if $data[0][0] != 1 then ...@@ -286,13 +286,13 @@ if $data[0][0] != 1 then
return -1 return -1
endi endi
if $data[0][2] != LEADER then if $data[0][2] != leader then
goto check_mnode_ready_2 goto check_mnode_ready_2
endi endi
if $data[1][2] != FOLLOWER then if $data[1][2] != follower then
goto check_mnode_ready_2 goto check_mnode_ready_2
endi endi
if $data[2][2] != FOLLOWER then if $data[2][2] != follower then
goto check_mnode_ready_2 goto check_mnode_ready_2
endi endi
...@@ -318,21 +318,21 @@ if $rows != $vgroups then ...@@ -318,21 +318,21 @@ if $rows != $vgroups then
return -1 return -1
endi endi
if $data[0][4] == LEADER then if $data[0][4] == leader then
if $data[0][6] == FOLLOWER then if $data[0][6] == follower then
if $data[0][8] == FOLLOWER then if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
endi endi
endi endi
elif $data[0][6] == LEADER then elif $data[0][6] == leader then
if $data[0][4] == FOLLOWER then if $data[0][4] == follower then
if $data[0][8] == FOLLOWER then if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
endi endi
endi endi
elif $data[0][8] == LEADER then elif $data[0][8] == leader then
if $data[0][4] == FOLLOWER then if $data[0][4] == follower then
if $data[0][6] == FOLLOWER then if $data[0][6] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
endi endi
endi endi
...@@ -340,21 +340,21 @@ else ...@@ -340,21 +340,21 @@ else
goto check_vg_ready1 goto check_vg_ready1
endi endi
if $data[1][4] == LEADER then if $data[1][4] == leader then
if $data[1][6] == FOLLOWER then if $data[1][6] == follower then
if $data[1][8] == FOLLOWER then if $data[1][8] == follower then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][3] print ---- vgroup $data[1][0] leader locate on dnode $data[1][3]
endi endi
endi endi
elif $data[1][6] == LEADER then elif $data[1][6] == leader then
if $data[1][4] == FOLLOWER then if $data[1][4] == follower then
if $data[1][8] == FOLLOWER then if $data[1][8] == follower then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][5] print ---- vgroup $data[1][0] leader locate on dnode $data[1][5]
endi endi
endi endi
elif $data[1][8] == LEADER then elif $data[1][8] == leader then
if $data[1][4] == FOLLOWER then if $data[1][4] == follower then
if $data[1][6] == FOLLOWER then if $data[1][6] == follower then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][7] print ---- vgroup $data[1][0] leader locate on dnode $data[1][7]
endi endi
endi endi
...@@ -362,21 +362,21 @@ else ...@@ -362,21 +362,21 @@ else
goto check_vg_ready1 goto check_vg_ready1
endi endi
if $data[2][4] == LEADER then if $data[2][4] == leader then
if $data[2][6] == FOLLOWER then if $data[2][6] == follower then
if $data[2][8] == FOLLOWER then if $data[2][8] == follower then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][3] print ---- vgroup $data[2][0] leader locate on dnode $data[2][3]
endi endi
endi endi
elif $data[2][6] == LEADER then elif $data[2][6] == leader then
if $data[2][4] == FOLLOWER then if $data[2][4] == follower then
if $data[2][8] == FOLLOWER then if $data[2][8] == follower then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][5] print ---- vgroup $data[2][0] leader locate on dnode $data[2][5]
endi endi
endi endi
elif $data[2][8] == LEADER then elif $data[2][8] == leader then
if $data[2][4] == FOLLOWER then if $data[2][4] == follower then
if $data[2][6] == FOLLOWER then if $data[2][6] == follower then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][7] print ---- vgroup $data[2][0] leader locate on dnode $data[2][7]
endi endi
endi endi
...@@ -384,21 +384,21 @@ else ...@@ -384,21 +384,21 @@ else
goto check_vg_ready1 goto check_vg_ready1
endi endi
if $data[3][4] == LEADER then if $data[3][4] == leader then
if $data[3][6] == FOLLOWER then if $data[3][6] == follower then
if $data[3][8] == FOLLOWER then if $data[3][8] == follower then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][3] print ---- vgroup $data[3][0] leader locate on dnode $data[3][3]
endi endi
endi endi
elif $data[3][6] == LEADER then elif $data[3][6] == leader then
if $data[3][4] == FOLLOWER then if $data[3][4] == follower then
if $data[3][8] == FOLLOWER then if $data[3][8] == follower then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][5] print ---- vgroup $data[3][0] leader locate on dnode $data[3][5]
endi endi
endi endi
elif $data[3][8] == LEADER then elif $data[3][8] == leader then
if $data[3][4] == FOLLOWER then if $data[3][4] == follower then
if $data[3][6] == FOLLOWER then if $data[3][6] == follower then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][7] print ---- vgroup $data[3][0] leader locate on dnode $data[3][7]
endi endi
endi endi
...@@ -406,21 +406,21 @@ else ...@@ -406,21 +406,21 @@ else
goto check_vg_ready1 goto check_vg_ready1
endi endi
if $data[4][4] == LEADER then if $data[4][4] == leader then
if $data[4][6] == FOLLOWER then if $data[4][6] == follower then
if $data[4][8] == FOLLOWER then if $data[4][8] == follower then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][3] print ---- vgroup $data[4][0] leader locate on dnode $data[4][3]
endi endi
endi endi
elif $data[4][6] == LEADER then elif $data[4][6] == leader then
if $data[4][4] == FOLLOWER then if $data[4][4] == follower then
if $data[4][8] == FOLLOWER then if $data[4][8] == follower then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][5] print ---- vgroup $data[4][0] leader locate on dnode $data[4][5]
endi endi
endi endi
elif $data[4][8] == LEADER then elif $data[4][8] == leader then
if $data[4][4] == FOLLOWER then if $data[4][4] == follower then
if $data[4][6] == FOLLOWER then if $data[4][6] == follower then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][7] print ---- vgroup $data[4][0] leader locate on dnode $data[4][7]
endi endi
endi endi
...@@ -539,27 +539,27 @@ if $data[0][0] != 1 then ...@@ -539,27 +539,27 @@ if $data[0][0] != 1 then
return -1 return -1
endi endi
if $data[0][2] == LEADER then if $data[0][2] == leader then
if $data[1][2] != FOLLOWER then if $data[1][2] != follower then
goto check_mnode_ready_3 goto check_mnode_ready_3
endi endi
if $data[2][2] != FOLLOWER then if $data[2][2] != follower then
goto check_mnode_ready_3 goto check_mnode_ready_3
endi endi
endi endi
if $data[1][2] == LEADER then if $data[1][2] == leader then
if $data[0][2] != FOLLOWER then if $data[0][2] != follower then
goto check_mnode_ready_3 goto check_mnode_ready_3
endi endi
if $data[2][2] != FOLLOWER then if $data[2][2] != follower then
goto check_mnode_ready_3 goto check_mnode_ready_3
endi endi
endi endi
if $data[2][2] == LEADER then if $data[2][2] == leader then
if $data[1][2] != FOLLOWER then if $data[1][2] != follower then
goto check_mnode_ready_3 goto check_mnode_ready_3
endi endi
if $data[0][2] != FOLLOWER then if $data[0][2] != follower then
goto check_mnode_ready_3 goto check_mnode_ready_3
endi endi
endi endi
......
...@@ -170,84 +170,3 @@ if $rows != 100 then ...@@ -170,84 +170,3 @@ if $rows != 100 then
return -1 return -1
endi endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
########################################################
print ===> start dnode1 dnode3 dnode4
system sh/exec.sh -n dnode1 -s start
#system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep 7000
print =============== query data
sql connect
sql use db
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
########################################################
print ===> start dnode1 dnode2 dnode4
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
#system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep 3000
print =============== query data
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
#system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
########################################################
print ===> start dnode1 dnode2 dnode3
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
#system sh/exec.sh -n dnode4 -s start
sleep 3000
print =============== query data
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
#system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
import taos
import sys
import datetime
import inspect
from util.log import *
from util.sql import *
from util.cases import *
import random
class TDTestCase:
updatecfgDict = {'debugFlag': 143, "cDebugFlag": 143, "uDebugFlag": 143, "rpcDebugFlag": 143, "tmrDebugFlag": 143,
"jniDebugFlag": 143, "simDebugFlag": 143, "dDebugFlag": 143, "dDebugFlag": 143, "vDebugFlag": 143, "mDebugFlag": 143, "qDebugFlag": 143,
"wDebugFlag": 143, "sDebugFlag": 143, "tsdbDebugFlag": 143, "tqDebugFlag": 143, "fsDebugFlag": 143, "udfDebugFlag": 143}
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def case1(self):
tdSql.execute("create database if not exists dbms precision 'ms'")
tdSql.execute("create database if not exists dbus precision 'us'")
tdSql.execute("create database if not exists dbns precision 'ns'")
tdSql.execute("create table dbms.ntb (ts timestamp, c1 int, c2 bigint)")
tdSql.execute("create table dbus.ntb (ts timestamp, c1 int, c2 bigint)")
tdSql.execute("create table dbns.ntb (ts timestamp, c1 int, c2 bigint)")
tdSql.execute("insert into dbms.ntb values ('2022-01-01 08:00:00.001', 1, 2)")
tdSql.execute("insert into dbms.ntb values ('2022-01-01 08:00:00.002', 3, 4)")
tdSql.execute("insert into dbus.ntb values ('2022-01-01 08:00:00.000001', 1, 2)")
tdSql.execute("insert into dbus.ntb values ('2022-01-01 08:00:00.000002', 3, 4)")
tdSql.execute("insert into dbns.ntb values ('2022-01-01 08:00:00.000000001', 1, 2)")
tdSql.execute("insert into dbns.ntb values ('2022-01-01 08:00:00.000000002', 3, 4)")
tdSql.query("select count(c1) from dbms.ntb interval(1a)")
tdSql.checkRows(2)
tdSql.query("select count(c1) from dbus.ntb interval(1u)")
tdSql.checkRows(2)
tdSql.query("select count(c1) from dbns.ntb interval(1b)")
tdSql.checkRows(2)
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
tdLog.printNoPrefix("==========start case1 run ...............")
self.case1()
tdLog.printNoPrefix("==========end case1 run ...............")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
...@@ -392,6 +392,31 @@ class TDTestCase: ...@@ -392,6 +392,31 @@ class TDTestCase:
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;")
assert unionallQnode==tdSql.queryResult assert unionallQnode==tdSql.queryResult
queryPolicy=1
tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy)
tdSql.query("show local variables;")
for i in range(tdSql.queryRows):
if tdSql.queryResult[i][0] == "queryPolicy" :
if int(tdSql.queryResult[i][1]) == int(queryPolicy):
tdLog.success('alter queryPolicy to %d successfully'%queryPolicy)
else :
tdLog.debug(tdSql.queryResult)
tdLog.exit("alter queryPolicy to %d failed"%queryPolicy)
tdSql.execute("reset query cache")
tdSql.execute("use db1;")
tdSql.query("show dnodes;")
dnodeId=tdSql.getData(0,0)
tdSql.query("select max(c1) from stb10;")
assert maxQnode==tdSql.getData(0,0)
tdSql.query("select min(c1) from stb11;")
assert minQnode==tdSql.getData(0,0)
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;")
assert unionQnode==tdSql.queryResult
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;")
assert unionallQnode==tdSql.queryResult
# test case : queryPolicy = 2 # test case : queryPolicy = 2
def test_case2(self): def test_case2(self):
self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 10, 2, 1*10) self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 10, 2, 1*10)
......
...@@ -40,9 +40,9 @@ class TDTestCase: ...@@ -40,9 +40,9 @@ class TDTestCase:
f"insert into rct{j} values ( {ts+i*10000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )" f"insert into rct{j} values ( {ts+i*10000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )"
) )
tdSql.execute( tdSql.execute(
f"insert into dct{j} values ( {ts+i*10000}, {1+i*0.1},{1400+i*15}, {1+i},{1500+i*20}, {150+i*2},{5+i} )" f"insert into dct{j} values ( {ts+i*10000}, {1+i*0.1},{1400+i*15}, {i},{1500+i*20}, {150+i*2},{5+i} )"
) )
tdSql.execute("insert into dct9 (ts,fuel_state) values('2021-07-13 14:06:33.123Z',1.2) ;")
# def check_avg(self ,origin_query , check_query): # def check_avg(self ,origin_query , check_query):
# avg_result = tdSql.getResult(origin_query) # avg_result = tdSql.getResult(origin_query)
# origin_result = tdSql.getResult(check_query) # origin_result = tdSql.getResult(check_query)
...@@ -60,13 +60,15 @@ class TDTestCase: ...@@ -60,13 +60,15 @@ class TDTestCase:
def tsbsIotQuery(self): def tsbsIotQuery(self):
tdSql.execute("use db_tsbs") tdSql.execute("use db_tsbs")
# test interval and partition # test interval and partition
tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet; ") tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet; ")
print(tdSql.queryResult)
parRows=tdSql.queryRows parRows=tdSql.queryRows
tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet interval(10m); ") tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet interval(10m); ")
# tdSql.checkRows(parRows) tdSql.checkRows(parRows)
# test insert into # test insert into
...@@ -77,18 +79,53 @@ class TDTestCase: ...@@ -77,18 +79,53 @@ class TDTestCase:
# test paitition interval fill # test paitition interval fill
# tdSql.query("SELECT name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0) ;") tdSql.query("SELECT name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0) ;")
# # test partition interval limit # test partition interval limit (PRcore-TD-17410)
# tdSql.query("SELECT ts,model,floor(2*(sum(nzs)/count(nzs)))/floor(2*(sum(nzs)/count(nzs))) AS broken_down FROM (SELECT ts,model, status/status AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model,ts interval(10m) limit 10;") # tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings partition BY name,driver,fleet interval (10m) limit 1);")
# tdSql.checkRows(10) # tdSql.checkRows(10)
# test partition interval Pseudo time-column # test partition interval Pseudo time-column
tdSql.query("SELECT count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;") tdSql.query("SELECT count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
# 1 high-load:
# tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name desc, ts DESC;")
# tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name ;")
# 2 stationary-trucks
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1)")
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1) WHERE fleet = 'West' AND mean_velocity < 1000 partition BY name")
# 3 long-driving-sessions
# tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT _wstart as ts,name,driver,avg(velocity) as mean_velocity FROM readings where ts > '2016-01-01T00:00:34Z' AND ts <= '2016-01-01T04:00:34Z' partition BY name,driver interval(10m)) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 22 ;")
#4 long-daily-sessions
tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT name,driver,avg(velocity) as mean_velocity FROM readings WHERE fleet ='West' AND ts > '2016-01-01T12:31:37Z' AND ts <= '2016-01-05T12:31:37Z' partition BY name,driver interval(10m) ) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 60")
# 5. avg-daily-driving-duration
tdSql.query("select _wstart as ts,fleet,name,driver,count(mv)/6 as hours_driven from ( select _wstart as ts,fleet,name,driver,avg(velocity) as mv from readings where ts > '2016-01-01T00:00:00Z' and ts < '2016-01-05T00:00:01Z' partition by fleet,name,driver interval(10m)) where ts > '2016-01-01T00:00:00Z' and ts < '2016-01-05T00:00:01Z' partition by fleet,name,driver interval(1d) ;")
# 6. avg-daily-driving-session
#taosc core dumped
tdSql.execute("create table random_measure2_1 (ts timestamp,ela float, name binary(40))")
tdSql.query("SELECT ts,diff(mv) AS difka FROM (SELECT ts,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name,ts interval(10m) fill(value,0)) GROUP BY name,ts;")
tdSql.query("SELECT _wstart,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0)")
# 7. avg-load
tdSql.query("SELECT fleet, model,avg(ml) AS mean_load_percentage FROM (SELECT fleet, model,current_load/load_capacity AS ml FROM diagnostics partition BY name, fleet, model) partition BY fleet, model order by fleet ;")
# 8. daily-activity
tdSql.query(" SELECT model,ms1 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
#it's already supported:
# last-loc
tdSql.query("")
# test
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdLog.printNoPrefix("==========step1:create database and table,insert data ==============") tdLog.printNoPrefix("==========step1:create database and table,insert data ==============")
self.prepareData() self.prepareData()
......
...@@ -426,7 +426,7 @@ class TDTestCase: ...@@ -426,7 +426,7 @@ class TDTestCase:
tdSql.query(" select unique(t1+c1) from stb1 ") tdSql.query(" select unique(t1+c1) from stb1 ")
tdSql.checkRows(13) tdSql.checkRows(13)
tdSql.query(" select unique(t1+c1) from stb1 partition by tbname ") tdSql.query(" select unique(t1+c1) from stb1 partition by tbname ")
tdSql.checkRows(13) tdSql.checkRows(20)
tdSql.query(" select unique(t1) from stb1 partition by tbname ") tdSql.query(" select unique(t1) from stb1 partition by tbname ")
tdSql.checkRows(2) tdSql.checkRows(2)
......
...@@ -29,6 +29,7 @@ python3 ./test.py -f 1-insert/block_wise.py ...@@ -29,6 +29,7 @@ python3 ./test.py -f 1-insert/block_wise.py
python3 ./test.py -f 1-insert/create_retentions.py python3 ./test.py -f 1-insert/create_retentions.py
python3 ./test.py -f 1-insert/table_param_ttl.py python3 ./test.py -f 1-insert/table_param_ttl.py
python3 ./test.py -f 2-query/db.py
python3 ./test.py -f 2-query/between.py python3 ./test.py -f 2-query/between.py
python3 ./test.py -f 2-query/distinct.py python3 ./test.py -f 2-query/distinct.py
python3 ./test.py -f 2-query/varchar.py python3 ./test.py -f 2-query/varchar.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册