提交 4468fb68 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact

......@@ -328,7 +328,7 @@ function install_header() {
${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h
${csudo}ln -s ${install_main_dir}/include/taosws.h ${inc_link_dir}/taosws.h || :
[ -f ${install_main_dir}/include/taosws.h ] && ${csudo}ln -s ${install_main_dir}/include/taosws.h ${inc_link_dir}/taosws.h || :
}
function add_newHostname_to_hosts() {
......
......@@ -161,13 +161,11 @@ if [[ $productName == "TDengine" ]]; then
mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
if [ "$osType" != "Darwin" ]; then
cp ${build_dir}/lib/*.jar ${install_dir}/connector || :
fi
if find ${connector_dir}/go -mindepth 1 -maxdepth 1 | read; then
cp -r ${connector_dir}/go ${install_dir}/connector
else
echo "WARNING: go connector not found, please check if want to use it!"
[ -f ${build_dir}/lib/*.jar ] && cp ${build_dir}/lib/*.jar ${install_dir}/connector || :
fi
git clone --depth 1 https://github.com/taosdata/driver-go ${install_dir}/connector/go
rm -rf ${install_dir}/connector/go/.git ||:
git clone --depth 1 https://github.com/taosdata/taos-connector-python ${install_dir}/connector/python
rm -rf ${install_dir}/connector/python/.git ||:
# cp -r ${connector_dir}/python ${install_dir}/connector
......
......@@ -290,19 +290,17 @@ fi
# Copy driver
mkdir -p ${install_dir}/driver && cp ${lib_files} ${install_dir}/driver && echo "${versionComp}" >${install_dir}/driver/vercomp.txt
cp ${wslib_files} ${install_dir}/driver || :
[ -f ${wslib_files} ] && cp ${wslib_files} ${install_dir}/driver || :
# Copy connector
if [ "$verMode" == "cluster" ]; then
connector_dir="${code_dir}/connector"
mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
cp ${build_dir}/lib/*.jar ${install_dir}/connector || :
if find ${connector_dir}/go -mindepth 1 -maxdepth 1 | read; then
cp -r ${connector_dir}/go ${install_dir}/connector
else
echo "WARNING: go connector not found, please check if want to use it!"
fi
[ -f ${build_dir}/lib/*.jar ] && cp ${build_dir}/lib/*.jar ${install_dir}/connector || :
git clone --depth 1 https://github.com/taosdata/driver-go ${install_dir}/connector/go
rm -rf ${install_dir}/connector/go/.git ||:
git clone --depth 1 https://github.com/taosdata/taos-connector-python ${install_dir}/connector/python
rm -rf ${install_dir}/connector/python/.git ||:
......@@ -314,6 +312,7 @@ if [ "$verMode" == "cluster" ]; then
git clone --depth 1 https://github.com/taosdata/libtaos-rs ${install_dir}/connector/rust
rm -rf ${install_dir}/connector/rust/.git ||:
# cp -r ${connector_dir}/python ${install_dir}/connector
# cp -r ${connector_dir}/nodejs ${install_dir}/connector
fi
......
......@@ -114,6 +114,7 @@ function clean_header() {
${csudo}rm -f ${inc_link_dir}/taos.h || :
${csudo}rm -f ${inc_link_dir}/taosdef.h || :
${csudo}rm -f ${inc_link_dir}/taoserror.h || :
${csudo}rm -f ${inc_link_dir}/taosudf.h || :
${csudo}rm -f ${inc_link_dir}/taosws.h || :
}
......
......@@ -357,6 +357,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
pStmt->bInfo.inExecCache = true;
if (pStmt->sql.autoCreateTbl) {
tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
return TSDB_CODE_SUCCESS;
}
}
......@@ -365,9 +366,11 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
if (pStmt->bInfo.inExecCache) {
ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1);
pStmt->bInfo.needParse = false;
tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
return TSDB_CODE_SUCCESS;
}
tscDebug("no stmt block cache for tb %s", pStmt->bInfo.tbFName);
return TSDB_CODE_SUCCESS;
}
......@@ -391,6 +394,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
tscDebug("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64 , pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
return TSDB_CODE_SUCCESS;
}
......@@ -406,6 +411,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
tscDebug("tb %s not exist", pStmt->bInfo.tbFName);
return TSDB_CODE_SUCCESS;
}
......@@ -420,6 +427,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
if (uid == pStmt->bInfo.tbUid) {
pStmt->bInfo.needParse = false;
tscDebug("tb %s is current table", pStmt->bInfo.tbFName);
return TSDB_CODE_SUCCESS;
}
......@@ -440,6 +449,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
pStmt->bInfo.boundTags = pCache->boundTags;
pStmt->bInfo.tagsCached = true;
tscDebug("tb %s in execBlock list, set to current", pStmt->bInfo.tbFName);
return TSDB_CODE_SUCCESS;
}
......@@ -461,6 +472,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);
return TSDB_CODE_SUCCESS;
}
......@@ -510,6 +523,8 @@ TAOS_STMT* stmtInit(STscObj* taos) {
int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
STscStmt* pStmt = (STscStmt*)stmt;
tscDebug("stmt start to prepare");
if (pStmt->sql.status >= STMT_PREPARE) {
STMT_ERR_RET(stmtResetStmt(pStmt));
}
......@@ -529,6 +544,8 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
STscStmt* pStmt = (STscStmt*)stmt;
tscDebug("stmt start to set tbName: %s", tbName);
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
int32_t insert = 0;
......@@ -559,6 +576,8 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
STscStmt* pStmt = (STscStmt*)stmt;
tscDebug("stmt start to set tbTags");
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
if (pStmt->bInfo.inExecCache) {
......@@ -572,6 +591,7 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
tscDebug("start to bind stmt tag values");
STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.sname.tname,
tags, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
......@@ -617,6 +637,8 @@ int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields
int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
STscStmt* pStmt = (STscStmt*)stmt;
tscDebug("start to bind stmt data, colIdx: %d", colIdx);
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
......@@ -707,6 +729,8 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
int stmtAddBatch(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
tscDebug("stmt start to add batch");
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
STMT_ERR_RET(stmtCacheBlock(pStmt));
......@@ -715,6 +739,8 @@ int stmtAddBatch(TAOS_STMT* stmt) {
}
int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
tscDebug("stmt start to update tbUid, blockNum: %d", pRsp->nBlocks);
if (pRsp->nBlocks <= 0) {
tscError("invalid submit resp block number %d", pRsp->nBlocks);
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
......@@ -727,11 +753,6 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
char* key = taosHashGetKey(pIter, &keyLen);
STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);
if (pMeta->uid != pStmt->bInfo.tbUid) {
tscError("table uid %" PRIx64 " mis-match with current table uid %" PRIx64, pMeta->uid, pStmt->bInfo.tbUid);
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
if (pMeta->uid) {
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
continue;
......@@ -775,6 +796,8 @@ int stmtExec(TAOS_STMT* stmt) {
SSubmitRsp* pRsp = NULL;
bool autoCreateTbl = pStmt->exec.autoCreateTbl;
tscDebug("stmt start to exec");
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
if (STMT_TYPE_QUERY == pStmt->sql.type) {
......
......@@ -138,6 +138,8 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->dbId = pCreate->dbUid;
pCfg->szPage = pCreate->pageSize * 1024;
pCfg->szCache = pCreate->pages;
pCfg->cacheLast = pCreate->cacheLast;
pCfg->cacheLastSize = pCreate->cacheLastSize;
pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
pCfg->isWeak = true;
pCfg->isTsma = pCreate->isTsma;
......
......@@ -204,6 +204,8 @@ _OVER:
mError("sma:%s, failed to decode from raw:%p since %s", pSma->name, pRaw, terrstr());
taosMemoryFreeClear(pSma->expr);
taosMemoryFreeClear(pSma->tagsFilter);
taosMemoryFreeClear(pSma->sql);
taosMemoryFreeClear(pSma->ast);
taosMemoryFreeClear(pRow);
return NULL;
}
......@@ -221,6 +223,8 @@ static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSma) {
mTrace("sma:%s, perform delete action, row:%p", pSma->name, pSma);
taosMemoryFreeClear(pSma->tagsFilter);
taosMemoryFreeClear(pSma->expr);
taosMemoryFreeClear(pSma->sql);
taosMemoryFreeClear(pSma->ast);
return 0;
}
......
......@@ -140,7 +140,10 @@ int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList,
void **pReader);
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds);
int32_t tsdbLastrowReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
size_t tsdbCacheGetCapacity(SVnode *pVnode);
// tq
......@@ -210,11 +213,13 @@ struct SVnodeCfg {
int32_t vgId;
char dbname[TSDB_DB_FNAME_LEN];
uint64_t dbId;
int32_t cacheLastSize;
int32_t szPage;
int32_t szCache;
uint64_t szBuf;
bool isHeap;
bool isWeak;
int8_t cacheLast;
int8_t isTsma;
int8_t isRsma;
int8_t hashMethod;
......
......@@ -237,6 +237,10 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, uint8_t **ppBuf);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf);
#define TSDB_CACHE_NO(c) ((c).cacheLast == 0)
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0)
// tsdbCache
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(SLRUCache *pCache);
......@@ -246,8 +250,13 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
size_t tsdbCacheGetCapacity(SVnode *pVnode);
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
// structs =======================
......
......@@ -15,11 +15,15 @@
#include "tsdb.h"
typedef struct {
TSKEY ts;
SColVal colVal;
} SLastCol;
int32_t tsdbOpenCache(STsdb *pTsdb) {
int32_t code = 0;
SLRUCache *pCache = NULL;
// TODO: get cfg from vnode config: pTsdb->pVnode->config.lruCapacity
size_t cfgCapacity = 1024 * 1024;
size_t cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
pCache = taosLRUCacheInit(cfgCapacity, -1, .5);
if (pCache == NULL) {
......@@ -61,10 +65,11 @@ static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value)
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) { taosArrayDestroy(value); }
static int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
char key[32] = {0};
int keyLen = 0;
// getTableCacheKey(uid, "lr", key, &keyLen);
getTableCacheKey(uid, 0, key, &keyLen);
......@@ -83,18 +88,79 @@ static int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKe
return code;
}
static int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
// getTableCacheKey(uid, "l", key, &keyLen);
getTableCacheKey(uid, 1, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
return code;
}
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
// getTableCacheKey(uid, "lr", key, &keyLen);
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h);
if (pRow->ts <= eKey) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
// getTableCacheKey(uid, "l", key, &keyLen);
getTableCacheKey(uid, 1, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
// clear last cache anyway, no matter where eKey ends.
taosLRUCacheRelease(pCache, h, true);
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
......@@ -173,11 +239,6 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
return code;
}
typedef struct {
TSKEY ts;
SColVal colVal;
} SLastCol;
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb) {
int32_t code = 0;
STSRow *cacheRow = NULL;
......@@ -405,7 +466,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
case SFSNEXTROW_FS:
state->aDFileSet = state->pTsdb->fs->cState->aDFileSet;
state->nFileSet = taosArrayGetSize(state->aDFileSet);
state->iFileSet = state->nFileSet - 1;
state->iFileSet = state->nFileSet;
state->pBlockData = NULL;
......@@ -1679,52 +1740,6 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
return code;
}
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
// getTableCacheKey(uid, "lr", key, &keyLen);
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h);
if (pRow->ts <= eKey) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
// getTableCacheKey(uid, "l", key, &keyLen);
getTableCacheKey(uid, 1, key, &keyLen);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
return code;
}
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) {
int32_t code = 0;
......@@ -1732,3 +1747,9 @@ int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) {
return code;
}
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
}
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
......@@ -181,8 +181,12 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
pMemTable->maxVersion = TMAX(pMemTable->maxVersion, version);
pMemTable->nDel++;
if (tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
tsdbCacheDelete(pTsdb->lruCache, pTbData->uid, eKey);
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);
}
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
}
tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
......@@ -556,12 +560,14 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
pTbData->maxKey = key.ts;
}
if (pLastRow != NULL) {
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && pLastRow != NULL) {
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, pLastRow, true);
}
}
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow, pMemTable->pTsdb);
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow, pMemTable->pTsdb);
}
pTbData->minVersion = TMIN(pTbData->minVersion, version);
pTbData->maxVersion = TMAX(pTbData->maxVersion, version);
......
......@@ -20,6 +20,8 @@ const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
.dbId = 0,
.szPage = 4096,
.szCache = 256,
.cacheLast = 3,
.cacheLastSize = 8,
.szBuf = 96 * 1024 * 1024,
.isHeap = false,
.isWeak = 0,
......@@ -60,6 +62,8 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "dbId", pCfg->dbId) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "szPage", pCfg->szPage) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "szCache", pCfg->szCache) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "cacheLast", pCfg->cacheLast) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "cacheLastSize", pCfg->cacheLastSize) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "szBuf", pCfg->szBuf) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeap) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
......@@ -133,6 +137,10 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "szCache", pCfg->szCache, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "cacheLast", pCfg->cacheLast, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "cacheLastSize", pCfg->cacheLastSize, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "szBuf", pCfg->szBuf, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeap, code);
......
......@@ -936,6 +936,11 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
vInfo("vgId:%d, start to alter vnode config, cacheLast:%d cacheLastSize:%d", TD_VID(pVnode), alterReq.cacheLast,
alterReq.cacheLastSize);
if (pVnode->config.cacheLastSize != alterReq.cacheLastSize) {
pVnode->config.cacheLastSize = alterReq.cacheLastSize;
// TODO: save config
tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
}
return 0;
}
......
......@@ -20,6 +20,7 @@
#include "os.h"
#include "tname.h"
#include "ttypes.h"
#include "query.h"
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
......
......@@ -630,6 +630,17 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
memcpy(*pDst, pSrc, sizeof(STableDataBlocks));
((STableDataBlocks*)(*pDst))->cloned = true;
STableDataBlocks* pBlock = (STableDataBlocks*)(*pDst);
if (pBlock->pTableMeta) {
void *pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pBlock->pTableMeta));
if (NULL == pNewMeta) {
taosMemoryFreeClear(*pDst);
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(pNewMeta, pBlock->pTableMeta, TABLE_META_SIZE(pBlock->pTableMeta));
pBlock->pTableMeta = pNewMeta;
}
return qResetStmtDataBlock(*pDst, false);
}
......
......@@ -476,12 +476,12 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
int32_t code = TSDB_CODE_SUCCESS;
// set grouyp keys, agg funcs and having conditions
if (TSDB_CODE_SUCCESS == code && pSelect->hasAggFuncs) {
if (TSDB_CODE_SUCCESS == code) {
code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, fmIsAggFunc, &pAgg->pAggFuncs);
}
// rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) {
code = rewriteExprsForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY);
}
......
......@@ -1986,7 +1986,8 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
SNode* pFunc = NULL;
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
if (FUNCTION_TYPE_LAST_ROW != ((SFunctionNode*)pFunc)->funcType) {
if (FUNCTION_TYPE_LAST_ROW != ((SFunctionNode*)pFunc)->funcType &&
FUNCTION_TYPE_SELECT_VALUE != ((SFunctionNode*)pFunc)->funcType) {
return false;
}
}
......
......@@ -1519,6 +1519,7 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl
static int32_t buildInsertValuesSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
pSubplan->msgType = pModify->msgType;
pSubplan->execNode.nodeId = pModify->pVgDataBlocks->vg.vgId;
pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
}
......
......@@ -877,7 +877,7 @@ static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return code;
}
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
......@@ -887,6 +887,25 @@ static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return code;
}
static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, NULL, pInfo->pSplitNode, true);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
}
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
++(pCxt->groupId);
return code;
}
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SScanLogicNode* pScan = (SScanLogicNode*)pInfo->pSplitNode;
if (NULL != pScan->pGroupTags) {
return stbSplSplitScanNodeWithPartTags(pCxt, pInfo);
}
return stbSplSplitScanNodeWithoutPartTags(pCxt, pInfo);
}
static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) {
SNode* pCol = NULL;
FOREACH(pCol, pScan->pScanCols) {
......
......@@ -59,4 +59,6 @@ TEST_F(PlanPartitionByTest, withGroupBy) {
useDb("root", "test");
run("select count(*) from t1 partition by c1 group by c2");
run("SELECT TBNAME, c1 FROM st1 PARTITION BY TBNAME GROUP BY c1");
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册