提交 290667dd 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/sim

......@@ -39,10 +39,10 @@
# number of management nodes in the system
# numOfMnodes 3
# enable/disable backuping vnode directory when removing dnode
# enable/disable backuping vnode directory when removing vnode
# vnodeBak 1
# if report installation / use information
# enable/disable installation / usage report
# telemetryReporting 1
# enable/disable load balancing
......@@ -81,7 +81,7 @@
# minimum time window, milli-second
# minIntervalTime 10
# maximum delay before launching a stream compution, milli-second
# maximum delay before launching a stream computation, milli-second
# maxStreamCompDelay 20000
# maximum delay before launching a stream computation for the first time, milli-second
......@@ -156,7 +156,7 @@
# max number of connections allowed in dnode
# maxShellConns 5000
# max numerber of connections allowed in client
# max number of connections allowed in client
# maxConnections 5000
# stop writing logs when the disk size of the log folder is less than this value
......@@ -187,7 +187,7 @@
# restfulRowLimit 10240
# The following parameter is used to limit the maximum number of lines in log files.
# max number of rows per log filters
# max number of lines per log filters
# numOfLogLines 10000000
# enable/disable async log
......@@ -199,7 +199,9 @@
# The following parameters are used for debug purpose only.
# debugFlag 8 bits mask: FILE-SCREEN-UNUSED-HeartBeat-DUMP-TRACE_WARN-ERROR
# 131: output warning and error, 135: output debug, warning and error, 143 : output trace, debug, warning and error to log.
# 131: output warning and error
# 135: output debug, warning and error
# 143: output trace, debug, warning and error to log
# 199: output debug, warning and error to both screen and file
# 207: output trace, debug, warning and error to both screen and file
......@@ -231,10 +233,10 @@
# cDebugFlag 131
# debug flag for JNI
# jniDebugflag 131
# jniDebugFlag 131
# debug flag for storage
# uDebugflag 131
# uDebugFlag 131
# debug flag for http server
# httpDebugFlag 131
......@@ -243,12 +245,12 @@
# monDebugFlag 131
# debug flag for query
# qDebugflag 131
# qDebugFlag 131
# debug flag for vnode
# vDebugflag 131
# vDebugFlag 131
# debug flag for http server
# debug flag for TSDB
# tsdbDebugFlag 131
# debug flag for continue query
......
......@@ -265,8 +265,14 @@ function install_config() {
[ -f ${cfg_dir}/taos.cfg ] && ${csudo} cp ${cfg_dir}/taos.cfg ${cfg_install_dir}
${csudo} chmod 644 ${cfg_install_dir}/*
fi
# Save standard input to 6 and open / dev / TTY on standard input
exec 6<&0 0</dev/tty
local_fqdn_check
# restore the backup standard input, and turn off 6
exec 0<&6 6<&-
${csudo} mv ${cfg_dir}/taos.cfg ${cfg_dir}/taos.cfg.org
${csudo} ln -s ${cfg_install_dir}/taos.cfg ${cfg_dir}
......@@ -422,7 +428,7 @@ function install_service() {
}
function install_TDengine() {
echo -e "${GREEN}Start to install TDEngine...${NC}"
echo -e "${GREEN}Start to install TDengine...${NC}"
#install log and data dir , then ln to /usr/local/taos
${csudo} mkdir -p ${log_dir} && ${csudo} chmod 777 ${log_dir}
......
......@@ -119,4 +119,4 @@ if ((${service_mod}==2)); then
kill_taosd
fi
echo -e "${GREEN}TDEngine is removed successfully!${NC}"
echo -e "${GREEN}TDengine is removed successfully!${NC}"
......@@ -388,10 +388,10 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
return;
}
assert(pSql->res.code != TSDB_CODE_SUCCESS);
tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
SSqlRes *pRes = &pSql->res;
if (pSql->fp == NULL || pSql->fetchFp == NULL){
return;
}
......
......@@ -2597,14 +2597,23 @@ static void percentile_next_step(SQLFunctionCtx *pCtx) {
}
//////////////////////////////////////////////////////////////////////////////////
static void buildHistogramInfo(SAPercentileInfo* pInfo) {
pInfo->pHisto = (SHistogramInfo*) ((char*) pInfo + sizeof(SAPercentileInfo));
pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo));
}
static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo* pInfo = NULL;
if (pCtx->stableQuery && pCtx->currentStage != SECONDARY_STAGE_MERGE) {
return (SAPercentileInfo*) pCtx->aOutputBuf;
pInfo = (SAPercentileInfo*) pCtx->aOutputBuf;
} else {
return GET_ROWCELL_INTERBUF(pResInfo);
pInfo = GET_ROWCELL_INTERBUF(pResInfo);
}
buildHistogramInfo(pInfo);
return pInfo;
}
static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
......@@ -2616,6 +2625,7 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
printf("%p, %p\n", pInfo->pHisto, pInfo->pHisto->elems);
return true;
}
......@@ -2624,6 +2634,8 @@ static void apercentile_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo *pInfo = getAPerctInfo(pCtx);
assert(pInfo->pHisto->elems != NULL);
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_CHAR_INDEX(pCtx, i);
......
......@@ -911,6 +911,13 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer,
}
}
if (pRes->numOfRowsGroup >= pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) {
pRes->numOfRows = 0;
pBeforeFillData->num = 0;
pLocalReducer->discard = true;
return;
}
pRes->numOfRowsGroup += pRes->numOfRows;
// impose the limitation of output rows on the final result
......
......@@ -284,16 +284,18 @@ void taos_close(TAOS *taos) {
return;
}
SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
if (pHb != NULL) {
if (pHb->rpcRid > 0) { // wait for rsp from dnode
rpcCancelRequest(pHb->rpcRid);
pHb->rpcRid = -1;
}
if (RID_VALID(pObj->hbrid)) {
SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
if (pHb != NULL) {
if (RID_VALID(pHb->rpcRid)) { // wait for rsp from dnode
rpcCancelRequest(pHb->rpcRid);
pHb->rpcRid = -1;
}
tscDebug("%p HB is freed", pHb);
taosReleaseRef(tscObjRef, pHb->self);
taos_free_result(pHb);
tscDebug("%p HB is freed", pHb);
taosReleaseRef(tscObjRef, pHb->self);
taos_free_result(pHb);
}
}
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
......
......@@ -222,7 +222,7 @@ int32_t uDebugFlag = 131;
int32_t debugFlag = 0;
int32_t sDebugFlag = 135;
int32_t wDebugFlag = 135;
int32_t tsdbDebugFlag = 131;
uint32_t tsdbDebugFlag = 131;
int32_t cqDebugFlag = 131;
int32_t (*monStartSystemFp)() = NULL;
......
Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df
Subproject commit ec77d9049a719dabfd1a7c1122a209e201861944
......@@ -349,11 +349,12 @@ CTaosInterface.prototype.useResult = function useResult(result) {
return fields;
}
CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
let pblock = ref.ref(ref.ref(ref.NULL)); // equal to our raw data
let num_of_rows = this.libtaos.taos_fetch_block(result, pblock)
if (num_of_rows == 0) {
//let pblock = ref.ref(ref.ref(ref.NULL)); // equal to our raw data
let pblock = this.libtaos.taos_fetch_row(result);
if (pblock == null) {
return {block:null, num_of_rows:0};
}
var fieldL = this.libtaos.taos_fetch_lengths(result);
let isMicro = (this.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO);
......@@ -361,7 +362,6 @@ CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
var fieldlens = [];
if (ref.isNull(fieldL) == false) {
for (let i = 0; i < fields.length; i ++) {
let plen = ref.reinterpret(fieldL, 4, i*4);
let len = plen.readInt32LE(0);
......
......@@ -28,7 +28,7 @@ extern "C" {
default: \
(_v) = (_finalType)GET_INT32_VAL(_data); \
break; \
};
}
#ifdef __cplusplus
}
......
......@@ -136,7 +136,7 @@ void httpSendErrorResp(HttpContext *pContext, int32_t errNo) {
else
httpCode = 400;
if (pContext->parser->httpCode != 0) {
if (pContext->parser && pContext->parser->httpCode != 0) {
httpCode = pContext->parser->httpCode;
}
......
......@@ -33,13 +33,6 @@ struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
typedef struct SGroupResInfo {
int32_t groupId;
int32_t numOfDataPages;
int32_t pageId;
int32_t rowId;
} SGroupResInfo;
typedef struct SResultRowPool {
int32_t elemSize;
int32_t blockSize;
......@@ -72,6 +65,12 @@ typedef struct SResultRow {
union {STimeWindow win; char* key;}; // start key of current time window
} SResultRow;
typedef struct SGroupResInfo {
int32_t rowId;
int32_t index;
SArray* pRows; // SArray<SResultRow*>
} SGroupResInfo;
/**
* If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate.
......@@ -89,7 +88,6 @@ typedef struct SResultRowInfo {
int32_t size:24; // number of result set
int32_t capacity; // max capacity
int32_t curIndex; // current start active index
int64_t startTime; // start time of the first time window for sliding query
int64_t prevSKey; // previous (not completed) sliding window start key
} SResultRowInfo;
......
......@@ -67,7 +67,7 @@ void tHistogramDestroy(SHistogramInfo** pHisto);
void tHistogramPrint(SHistogramInfo* pHisto);
int32_t vnodeHistobinarySearch(SHistBin* pEntry, int32_t len, double val);
//int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val);
SHeapEntry* tHeapCreate(int32_t numOfEntries);
void tHeapSort(SHeapEntry* pEntry, int32_t len);
......
......@@ -77,7 +77,6 @@ void* destroyResultRowPool(SResultRowPool* p);
int32_t getNumOfAllocatedResultRows(SResultRowPool* p);
int32_t getNumOfUsedResultRows(SResultRowPool* p);
uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv);
bool isPointInterpoQuery(SQuery *pQuery);
......
......@@ -53,7 +53,7 @@
#define TIME_WINDOW_COPY(_dst, _src) do {\
(_dst).skey = (_src).skey;\
(_dst).ekey = (_src).ekey;\
} while (0);
} while (0)
enum {
// when query starts to execute, this status will set
......@@ -178,11 +178,10 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
#define IS_STASBLE_QUERY_OVER(_q) ((_q)->tableIndex >= (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
// todo move to utility
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group);
static int32_t mergeIntoGroupResultImpl(SGroupResInfo* pGroupResInfo, SArray *pTableList, SQInfo* pQInfo);
static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult);
static void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult);
static void resetMergeResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRow *pRow);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
......@@ -195,7 +194,6 @@ static bool hasMainOutput(SQuery *pQuery);
static void buildTagQueryResult(SQInfo *pQInfo);
static int32_t setAdditionalInfo(SQInfo *pQInfo, void *pTable, STableQueryInfo *pTableQueryInfo);
static int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo);
static int32_t checkForQueryBuf(size_t numOfTables);
static void releaseQueryBuf(size_t numOfTables);
static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order);
......@@ -291,7 +289,7 @@ void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) {
}
}
static int32_t getMergeResultGroupId(int32_t groupIndex) {
static UNUSED_FUNC int32_t getMergeResultGroupId(int32_t groupIndex) {
int32_t base = 50000000;
return base + (groupIndex * 10000);
}
......@@ -466,16 +464,34 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData,
int16_t bytes, bool masterscan, uint64_t uid) {
bool existed = false;
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
int32_t *p1 =
(int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (p1 != NULL) {
pResultRowInfo->curIndex = *p1;
SResultRow **p1 =
(SResultRow **)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
// in case of repeat scan/reverse scan, no new time window added.
if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) {
if (!masterscan) { // the *p1 may be NULL in case of sliding+offset exists.
return (p1 != NULL)? *p1:NULL;
}
if (p1 != NULL) {
for(int32_t i = pResultRowInfo->size - 1; i >= 0; --i) {
if (pResultRowInfo->pResult[i] == (*p1)) {
pResultRowInfo->curIndex = i;
existed = true;
break;
}
}
}
} else {
if (!masterscan) { // not master scan, do not add new timewindow
return NULL;
if (p1 != NULL) { // group by column query
return *p1;
}
}
if (!existed) {
// TODO refactor
// more than the capacity, reallocate the resources
if (pResultRowInfo->size >= pResultRowInfo->capacity) {
......@@ -499,17 +515,23 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
pResultRowInfo->capacity = (int32_t)newCapacity;
}
SResultRow *pResult = getNewResultRow(pRuntimeEnv->pool);
pResultRowInfo->pResult[pResultRowInfo->size] = pResult;
int32_t ret = initResultRow(pResult);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
SResultRow *pResult = NULL;
if (p1 == NULL) {
pResult = getNewResultRow(pRuntimeEnv->pool);
int32_t ret = initResultRow(pResult);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// add a new result set for a new group
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pResult, POINTER_BYTES);
} else {
pResult = *p1;
}
// add a new result set for a new group
pResultRowInfo->pResult[pResultRowInfo->size] = pResult;
pResultRowInfo->curIndex = pResultRowInfo->size++;
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes),
(char *)&pResultRowInfo->curIndex, sizeof(int32_t));
}
// too many time window in query
......@@ -591,7 +613,6 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
if (pData->num >= numOfRowsPerPage) {
// release current page first, and prepare the next one
releaseResBufPageInfo(pResultBuf, pi);
pData = getNewDataBuf(pResultBuf, tid, &pageId);
if (pData != NULL) {
assert(pData->num == 0); // number of elements must be 0 for new allocated buffer
......@@ -614,24 +635,20 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
return 0;
}
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SDataBlockInfo* pBockInfo,
STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) {
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win,
bool masterscan, SResultRow** pResult, int64_t groupId) {
assert(win->skey <= win->ekey);
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
// todo refactor
int64_t uid = getResultInfoUId(pRuntimeEnv);
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, uid);
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
if (pResultRow == NULL) {
*newWind = false;
return masterscan? -1:0; // no master scan, no result generated means error occurs
*pResult = NULL;
return TSDB_CODE_SUCCESS;
}
*newWind = true;
// not assign result buffer yet, add new result buffer
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, pBockInfo->tid, pRuntimeEnv->numOfRowsPerPage);
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->numOfRowsPerPage);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
......@@ -701,81 +718,47 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
return forwardStep;
}
static int32_t updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, TSKEY lastKey, bool ascQuery) {
int32_t i = 0;
static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, bool ascQuery) {
int64_t skey = TSKEY_INITIAL_VAL;
int32_t numOfClosed = 0;
for (i = 0; i < pWindowResInfo->size; ++i) {
SResultRow *pResult = pWindowResInfo->pResult[i];
int32_t i = 0;
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
SResultRow *pResult = pResultRowInfo->pResult[i];
if (pResult->closed) {
numOfClosed += 1;
continue;
break;
}
TSKEY ekey = pResult->win.ekey;
if ((ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
closeResultRow(pWindowResInfo, i);
// new closed result rows
if ((pResult->win.ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
closeResultRow(pResultRowInfo, i);
} else {
skey = pResult->win.skey;
break;
}
}
// all windows are closed, set the last one to be the skey
// all result rows are closed, set the last one to be the skey
if (skey == TSKEY_INITIAL_VAL) {
assert(i == pWindowResInfo->size);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
pResultRowInfo->curIndex = pResultRowInfo->size - 1;
} else {
pWindowResInfo->curIndex = i;
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey;
}
return numOfClosed;
}
/**
* NOTE: the query status only set for the first scan of master scan.
*/
static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SResultRowInfo *pWindowResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pRuntimeEnv->scanFlag != MASTER_SCAN || pWindowResInfo->size == 0) {
return pWindowResInfo->size;
}
// no qualified results exist, abort check
int32_t numOfClosed = 0;
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
// query completed
if ((lastKey >= pQuery->current->win.ekey && ascQuery) || (lastKey <= pQuery->current->win.ekey && (!ascQuery))) {
closeAllResultRows(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL);
} else { // set the current index to be the last unclosed window
numOfClosed = updateResultRowCurrentIndex(pWindowResInfo, lastKey, ascQuery);
// the number of completed slots are larger than the threshold, return current generated results to client.
if (numOfClosed > pQuery->rec.threshold) {
qDebug("QInfo:%p total result window:%d closed:%d, reached the output threshold %d, return",
GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, numOfClosed, pQuery->rec.threshold);
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
} else {
qDebug("QInfo:%p total result window:%d already closed:%d", GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size,
numOfClosed);
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
SResultRow *pResult = pResultRowInfo->pResult[i];
if (pResult->closed) {
break;
}
}
}
// output has reached the limitation, set query completed
if (pQuery->limit.limit > 0 && (pQuery->limit.limit + pQuery->limit.offset) <= numOfClosed &&
pRuntimeEnv->scanFlag == MASTER_SCAN) {
setQueryStatus(pQuery, QUERY_COMPLETED);
pResultRowInfo->curIndex = i + 1; // current not closed result object
pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey;
}
}
assert(pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL);
return numOfClosed;
static void updateResultRowIndex(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, bool ascQuery) {
if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) {
closeAllResultRows(pResultRowInfo);
pResultRowInfo->curIndex = pResultRowInfo->size - 1;
} else {
doUpdateResultRowIndex(pResultRowInfo, pTableQueryInfo->lastKey, ascQuery);
}
}
static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn,
......@@ -818,52 +801,47 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
return num;
}
static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed, STimeWindow *pWin, int32_t offset,
int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal) {
SQuery * pQuery = pRuntimeEnv->pQuery;
static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pWin, int32_t offset, int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
bool hasPrev = pCtx[0].preAggVals.isSet;
if (IS_MASTER_SCAN(pRuntimeEnv) || closed) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].size = forwardStep;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1);
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
pCtx[k].ptsList = &tsCol[pCtx[k].startOffset];
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].size = forwardStep;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1);
// not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here
if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) {
pCtx[k].preAggVals.isSet = false;
}
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
pCtx[k].ptsList = &tsCol[pCtx[k].startOffset];
}
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
// not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here
if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) {
pCtx[k].preAggVals.isSet = false;
}
// restore it
pCtx[k].preAggVals.isSet = hasPrev;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
// restore it
pCtx[k].preAggVals.isSet = hasPrev;
}
}
static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed, STimeWindow *pWin, int32_t offset) {
SQuery * pQuery = pRuntimeEnv->pQuery;
static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pWin, int32_t offset) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
if (IS_MASTER_SCAN(pRuntimeEnv) || closed) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
}
}
......@@ -1124,6 +1102,45 @@ static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, TSKEY
return ts;
}
static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray *pDataBlock,
SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep) {
if (!pRuntimeEnv->timeWindowInterpo) {
return;
}
assert(pDataBlock != NULL);
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0);
TSKEY *tsCols = (TSKEY *)(pColInfo->pData);
bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
if (!done) {
int32_t startRowIndex = startPos;
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, win);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
}
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
}
done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
if (!done) {
int32_t endRowIndex = startPos + (forwardStep - 1) * step;
TSKEY endKey = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey;
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, endRowIndex, pDataBlock, tsCols, endKey, win);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP);
}
}
/**
* todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions
* @param pRuntimeEnv
......@@ -1139,7 +1156,9 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery;
SQuery *pQuery = pRuntimeEnv->pQuery;
int64_t groupId = pQuery->current->groupIndex;
TSKEY *tsCols = NULL;
if (pDataBlock != NULL) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0);
......@@ -1164,80 +1183,46 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step);
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
bool hasTimeWindow = false;
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult);
if (ret != TSDB_CODE_SUCCESS) {
tfree(sasArray);
return;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
goto _end;
}
int32_t forwardStep = 0;
int32_t startPos = pQuery->pos;
// in case of repeat scan/reverse scan, no new time window added.
if (hasTimeWindow) {
TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
// prev time window not interpolation yet.
int32_t curIndex = curTimeWindowIndex(pWindowResInfo);
if (prevIndex != -1 && prevIndex < curIndex && pRuntimeEnv->timeWindowInterpo) {
for(int32_t j = prevIndex; j < curIndex; ++j) {
SResultRow *pRes = pWindowResInfo->pResult[j];
// prev time window not interpolation yet.
int32_t curIndex = curTimeWindowIndex(pWindowResInfo);
if (prevIndex != -1 && prevIndex < curIndex && pRuntimeEnv->timeWindowInterpo) {
for(int32_t j = prevIndex; j < curIndex; ++j) {
SResultRow *pRes = pWindowResInfo->pResult[j];
STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &w, masterScan, &hasTimeWindow, &pResult);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &w, masterScan, &pResult, groupId);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
int32_t p = QUERY_IS_ASC_QUERY(pQuery)? 0:pDataBlockInfo->rows-1;
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY*) pRuntimeEnv->prevRow[0], -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
int32_t p = QUERY_IS_ASC_QUERY(pQuery)? 0:pDataBlockInfo->rows-1;
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY*) pRuntimeEnv->prevRow[0], -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, closed, &w, startPos, 0, tsCols, pDataBlockInfo->rows);
}
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult);
assert (ret == TSDB_CODE_SUCCESS); // null data, too many state code
doBlockwiseApplyFunctions(pRuntimeEnv, &w, startPos, 0, tsCols, pDataBlockInfo->rows);
}
// window start key interpolation
if (pRuntimeEnv->timeWindowInterpo) {
bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
if (!done) {
int32_t startRowIndex = pQuery->pos;
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, &win);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
}
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
}
done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
if (!done) {
int32_t endRowIndex = pQuery->pos + (forwardStep - 1) * step;
TSKEY endKey = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey;
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, endRowIndex, pDataBlock, tsCols, endKey, &win);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP);
}
}
bool pStatus = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
assert (ret == TSDB_CODE_SUCCESS);
}
int32_t index = pWindowResInfo->curIndex;
STimeWindow nextWin = win;
// window start key interpolation
doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &win, pQuery->pos, forwardStep);
doBlockwiseApplyFunctions(pRuntimeEnv, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
STimeWindow nextWin = win;
while (1) {
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pDataBlockInfo, tsCols, searchFn, prevEndPos);
......@@ -1246,50 +1231,19 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
// null data, failed to allocate more memory buffer
hasTimeWindow = false;
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow, &pResult) !=
TSDB_CODE_SUCCESS) {
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &nextWin, masterScan, &pResult, groupId);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
break;
}
if (!hasTimeWindow) {
continue;
}
TSKEY ekey = reviseWindowEkey(pQuery, &nextWin);
ekey = reviseWindowEkey(pQuery, &nextWin);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true);
// window start(end) key interpolation
if (pRuntimeEnv->timeWindowInterpo) {
bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
if (!done) {
int32_t startRowIndex = startPos;
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, &nextWin);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
}
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
}
done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
if (!done) {
int32_t endRowIndex = startPos + (forwardStep - 1)*step;
TSKEY endKey = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey;
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, endRowIndex, pDataBlock, tsCols, endKey, &nextWin);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP);
}
}
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &nextWin, startPos, forwardStep);
doBlockwiseApplyFunctions(pRuntimeEnv, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
}
pWindowResInfo->curIndex = index;
} else {
/*
* the sqlfunctionCtx parameters should be set done before all functions are invoked,
......@@ -1305,6 +1259,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
}
_end:
if (pRuntimeEnv->timeWindowInterpo) {
saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock);
}
......@@ -1325,8 +1280,6 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
return -1;
}
int32_t GROUPRESULTID = 1;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
// not assign result buffer yet, add new result buffer, TODO remove it
......@@ -1342,11 +1295,8 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
uint64_t uid = groupIndex;
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, uid);
if (pResultRow == NULL) {
return -1;
}
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, groupIndex);
assert (pResultRow != NULL);
int64_t v = -1;
GET_TYPED_DATA(v, int64_t, type, pData);
......@@ -1363,7 +1313,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
}
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage);
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage);
if (ret != 0) {
return -1;
}
......@@ -1498,7 +1448,7 @@ void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDa
double v1 = 0, v2 = 0, v = 0;
if (prevRowIndex == -1) {
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pRuntimeEnv->prevRow[k]);
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pRuntimeEnv->prevRow[index]);
} else {
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes);
}
......@@ -1565,6 +1515,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* item = pQuery->current;
int64_t groupId = item->groupIndex;
SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0);
TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL;
......@@ -1635,15 +1587,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
bool hasTimeWindow = false;
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue;
}
if (!hasTimeWindow) {
continue;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { // null data, too many state code
goto _end;
}
// window start key interpolation
......@@ -1654,18 +1601,15 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
for (int32_t k = prevWindowIndex; k < curIndex; ++k) {
SResultRow *pRes = pWindowResInfo->pResult[k];
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &pRes->win, masterScan, &hasTimeWindow, &pResult);
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &pRes->win, masterScan, &pResult, groupId);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
setTimeWindowEKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &pRes->win);
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, closed, &pRes->win, offset);
doRowwiseApplyFunctions(pRuntimeEnv, &pRes->win, offset);
}
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow,
&pResult);
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue;
}
......@@ -1674,8 +1618,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &win);
}
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, closed, &win, offset);
doRowwiseApplyFunctions(pRuntimeEnv, &win, offset);
STimeWindow nextWin = win;
int32_t index = pWindowResInfo->curIndex;
......@@ -1692,16 +1635,13 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
// null data, failed to allocate more memory buffer
hasTimeWindow = false;
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow, &pResult) != TSDB_CODE_SUCCESS) {
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &nextWin, masterScan, &pResult, groupId);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
break;
}
if (hasTimeWindow) {
setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &nextWin);
closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, offset);
}
setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &nextWin);
doRowwiseApplyFunctions(pRuntimeEnv, &nextWin, offset);
}
pWindowResInfo->curIndex = index;
......@@ -1736,6 +1676,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
}
_end:
assert(offset >= 0);
if (tsCols != NULL) {
item->lastKey = tsCols[offset] + step;
......@@ -1763,28 +1704,26 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
SDataStatis *pStatis, __block_search_fn_t searchFn, SArray *pDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQInfo = pQuery->current;
SResultRowInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
STableQueryInfo* pTableQueryInfo = pQuery->current;
SResultRowInfo* pResultRowInfo = &pRuntimeEnv->windowResInfo;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock);
} else {
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock);
}
// update the lastkey of current table
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
pTableQInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
pTableQueryInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// interval query with limit applied
int32_t numOfRes = 0;
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
} else if (pRuntimeEnv->groupbyNormalCol) {
closeAllResultRows(pWindowResInfo);
numOfRes = pWindowResInfo->size;
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
numOfRes = pResultRowInfo->size;
updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery));
} else { // projection query
numOfRes = (int32_t)getNumOfResult(pRuntimeEnv);
numOfRes = (int32_t) getNumOfResult(pRuntimeEnv);
// update the number of output result
if (numOfRes > 0 && pQuery->checkBuffer == 1) {
......@@ -1799,8 +1738,8 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
setQueryStatus(pQuery, QUERY_COMPLETED);
}
if (((pTableQInfo->lastKey > pTableQInfo->win.ekey) && QUERY_IS_ASC_QUERY(pQuery)) ||
((pTableQInfo->lastKey < pTableQInfo->win.ekey) && (!QUERY_IS_ASC_QUERY(pQuery)))) {
if (((pTableQueryInfo->lastKey > pTableQueryInfo->win.ekey) && QUERY_IS_ASC_QUERY(pQuery)) ||
((pTableQueryInfo->lastKey < pTableQueryInfo->win.ekey) && (!QUERY_IS_ASC_QUERY(pQuery)))) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
}
......@@ -2182,8 +2121,8 @@ static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) {
// todo refactor with isLastRowQuery
bool isPointInterpoQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionID = pQuery->pExpr1[i].base.functionId;
if (functionID == TSDB_FUNC_INTERP) {
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_INTERP) {
return true;
}
}
......@@ -2618,6 +2557,8 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW
*status = BLK_DATA_NO_NEEDED;
SQuery *pQuery = pRuntimeEnv->pQuery;
int64_t groupId = pQuery->current->groupIndex;
SQueryCostInfo* pCost = &pRuntimeEnv->summary;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf > 0) {
......@@ -2634,15 +2575,13 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
bool hasTimeWindow = false;
SResultRow* pResult = NULL;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey;
STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, masterScan, &hasTimeWindow, &pResult) !=
TSDB_CODE_SUCCESS) {
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId) != TSDB_CODE_SUCCESS) {
// todo handle error in set result for timewindow
}
}
......@@ -2840,13 +2779,9 @@ static void doSetInitialTimewindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo
if (QUERY_IS_ASC_QUERY(pQuery)) {
getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, pBlockInfo->window.skey, pQuery->window.ekey, &w);
pWindowResInfo->startTime = w.skey;
pWindowResInfo->prevSKey = w.skey;
} else {
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
} else { // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
getAlignQueryTimeWindow(pQuery, pBlockInfo->window.ekey, pQuery->window.ekey, pBlockInfo->window.ekey, &w);
pWindowResInfo->startTime = pQuery->window.skey;
pWindowResInfo->prevSKey = w.skey;
}
}
......@@ -2916,13 +2851,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
if (QUERY_IS_INTERVAL_QUERY(pQuery) && (IS_MASTER_SCAN(pRuntimeEnv)|| pRuntimeEnv->scanFlag == REPEAT_SCAN)) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
closeAllResultRows(&pRuntimeEnv->windowResInfo);
pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window
} else {
assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
}
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
closeAllResultRows(&pRuntimeEnv->windowResInfo);
pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window
}
return 0;
......@@ -3027,7 +2958,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
}
}
static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SResultRow *pWindowRes, bool mergeFlag) {
static UNUSED_FUNC void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SResultRow *pWindowRes, bool mergeFlag) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
......@@ -3176,19 +3107,18 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim
typedef struct SCompSupporter {
STableQueryInfo **pTableQueryInfo;
int32_t * position;
SQInfo * pQInfo;
int32_t *rowIndex;
int32_t order;
} SCompSupporter;
int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) {
int32_t left = *(int32_t *)pLeft;
int32_t left = *(int32_t *)pLeft;
int32_t right = *(int32_t *)pRight;
SCompSupporter * supporter = (SCompSupporter *)param;
SQueryRuntimeEnv *pRuntimeEnv = &supporter->pQInfo->runtimeEnv;
int32_t leftPos = supporter->position[left];
int32_t rightPos = supporter->position[right];
int32_t leftPos = supporter->rowIndex[left];
int32_t rightPos = supporter->rowIndex[right];
/* left source is exhausted */
if (leftPos == -1) {
......@@ -3200,53 +3130,55 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
return -1;
}
SResultRowInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo;
SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos);
tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId);
STableQueryInfo** pList = supporter->pTableQueryInfo;
char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1);
TSKEY leftTimestamp = GET_INT64_VAL(b1);
SResultRowInfo *pWindowResInfo1 = &(pList[left]->windowResInfo);
SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos);
TSKEY leftTimestamp = pWindowRes1->win.skey;
SResultRowInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo;
SResultRowInfo *pWindowResInfo2 = &(pList[right]->windowResInfo);
SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos);
tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId);
char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2, page2);
TSKEY rightTimestamp = GET_INT64_VAL(b2);
TSKEY rightTimestamp = pWindowRes2->win.skey;
if (leftTimestamp == rightTimestamp) {
return 0;
}
return leftTimestamp > rightTimestamp ? 1 : -1;
if (supporter->order == TSDB_ORDER_ASC) {
return (leftTimestamp > rightTimestamp)? 1:-1;
} else {
return (leftTimestamp < rightTimestamp)? 1:-1;
}
}
int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
int32_t mergeGroupResult(SQInfo *pQInfo) {
int64_t st = taosGetTimestampUs();
int32_t ret = TSDB_CODE_SUCCESS;
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
while (pQInfo->groupIndex < numOfGroups) {
SArray *group = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex);
ret = mergeIntoGroupResultImpl(pQInfo, group);
if (ret < 0) { // not enough disk space to save the data into disk
int32_t ret = mergeIntoGroupResultImpl(pGroupResInfo, group, pQInfo);
if (ret < 0) {
return -1;
}
pQInfo->groupIndex += 1;
// this group generates at least one result, return results
if (ret > 0) {
pQInfo->groupIndex += 1;
if (taosArrayGetSize(pGroupResInfo->pRows) > 0) {
break;
}
assert(pQInfo->groupResInfo.numOfDataPages == 0);
qDebug("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1);
taosArrayClear(pGroupResInfo->pRows);
pGroupResInfo->index = 0;
pGroupResInfo->rowId = 0;
}
SGroupResInfo* info = &pQInfo->groupResInfo;
if (pQInfo->groupIndex == numOfGroups && info->pageId == info->numOfDataPages) {
if (pQInfo->groupIndex == numOfGroups && taosArrayGetSize(pGroupResInfo->pRows) == 0) {
SET_STABLE_QUERY_OVER(pQInfo);
}
......@@ -3258,89 +3190,28 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
return TSDB_CODE_SUCCESS;
}
static int32_t doCopyToSData(SQInfo *pQInfo, SResultRow **pRows, int32_t numOfRows, int32_t* index, int32_t orderType);
void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
// all results have been return to client, try next group
if (pGroupResInfo->pageId == pGroupResInfo->numOfDataPages) {
pGroupResInfo->numOfDataPages = 0;
pGroupResInfo->pageId = 0;
pGroupResInfo->rowId = 0;
// all results in current group have been returned to client, try next group
if (pGroupResInfo->index >= taosArrayGetSize(pGroupResInfo->pRows)) {
// current results of group has been sent to client, try next group
if (mergeIntoGroupResult(pQInfo) != TSDB_CODE_SUCCESS) {
if (mergeGroupResult(pQInfo) != TSDB_CODE_SUCCESS) {
return; // failed to save data in the disk
}
// check if all results has been sent to client
int32_t numOfGroup = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
if (pGroupResInfo->numOfDataPages == 0 && pQInfo->groupIndex == numOfGroup) {
if (taosArrayGetSize(pGroupResInfo->pRows) == 0 && pQInfo->groupIndex == numOfGroup) {
SET_STABLE_QUERY_OVER(pQInfo);
return;
}
}
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int32_t id = pQInfo->groupResInfo.groupId;
SIDList list = getDataBufPagesIdList(pResultBuf, id);
int32_t offset = 0;
int32_t numOfCopiedRows = 0;
size_t size = taosArrayGetSize(list);
assert(size == pGroupResInfo->numOfDataPages);
bool done = false;
//TODO add API for release none-dirty pages
// SPageInfo* prev = NULL;
for (int32_t j = pGroupResInfo->pageId; j < size; ++j) {
SPageInfo* pi = *(SPageInfo**) taosArrayGet(list, j);
tFilePage* pData = getResBufPage(pResultBuf, pi->pageId);
// release previous buffer pages
// if (prev == NULL) {
// prev = pi;
// } else {
// if (prev->pageId != pi->pageId) {
// releaseResBufPageInfo(pResultBuf, prev);
// prev = pi;
// }
// }
assert(pData->num > 0 && pData->num <= pRuntimeEnv->numOfRowsPerPage && pGroupResInfo->rowId < pData->num);
int32_t numOfRes = (int32_t)(pData->num - pGroupResInfo->rowId);
if (numOfRes > pQuery->rec.capacity - offset) {
numOfCopiedRows = (int32_t)(pQuery->rec.capacity - offset);
pGroupResInfo->rowId += numOfCopiedRows;
done = true;
} else {
numOfCopiedRows = (int32_t)pData->num;
pGroupResInfo->pageId += 1;
pGroupResInfo->rowId = 0;
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
char * pDest = pQuery->sdata[i]->data;
memcpy(pDest + offset * bytes, pData->data + pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage,
(size_t)bytes * numOfCopiedRows);
}
offset += numOfCopiedRows;
if (done) {
break;
}
}
assert(pQuery->rec.rows == 0);
pQuery->rec.rows += offset;
int32_t size = (int32_t) taosArrayGetSize(pGroupResInfo->pRows);
pQuery->rec.rows = doCopyToSData(pQInfo, pGroupResInfo->pRows->pData, (int32_t) size, &pGroupResInfo->index, TSDB_ORDER_ASC);
}
int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow) {
......@@ -3368,155 +3239,99 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResu
return 0;
}
int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
int32_t mergeIntoGroupResultImpl(SGroupResInfo* pGroupResInfo, SArray *pTableList, SQInfo* pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQuery);
size_t size = taosArrayGetSize(pGroup);
tFilePage **buffer = pQuery->sdata;
int32_t code = TSDB_CODE_SUCCESS;
int32_t *posList = calloc(size, sizeof(int32_t));
STableQueryInfo **pTableList = malloc(POINTER_BYTES * size);
int32_t *posList = NULL;
SLoserTreeInfo *pTree = NULL;
STableQueryInfo **pTableQueryInfoList = NULL;
size_t size = taosArrayGetSize(pTableList);
if (pGroupResInfo->pRows == NULL) {
pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES);
}
if (pTableList == NULL || posList == NULL) {
tfree(posList);
tfree(pTableList);
posList = calloc(size, sizeof(int32_t));
pTableQueryInfoList = malloc(POINTER_BYTES * size);
if (pTableQueryInfoList == NULL || posList == NULL) {
qError("QInfo:%p failed alloc memory", pQInfo);
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
}
// todo opt for the case of one table per group
int32_t numOfTables = 0;
SIDList pageList = NULL;
int32_t tid = -1;
for (int32_t i = 0; i < size; ++i) {
STableQueryInfo *item = taosArrayGetP(pGroup, i);
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid);
if (taosArrayGetSize(list) > 0 && item->windowResInfo.size > 0) {
pTableList[numOfTables++] = item;
tid = TSDB_TABLEID(item->pTable)->tid;
pageList = list;
STableQueryInfo *item = taosArrayGetP(pTableList, i);
if (item->windowResInfo.size > 0) {
pTableQueryInfoList[numOfTables++] = item;
}
}
// there is no data in current group
// no need to merge results since only one table in each group
if (numOfTables == 0) {
tfree(posList);
tfree(pTableList);
return 0;
} else if (numOfTables == 1) { // no need to merge results since only one table in each group
tfree(posList);
tfree(pTableList);
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
pGroupResInfo->numOfDataPages = (int32_t)taosArrayGetSize(pageList);
pGroupResInfo->groupId = tid;
pGroupResInfo->pageId = 0;
pGroupResInfo->rowId = 0;
return pGroupResInfo->numOfDataPages;
goto _end;
}
SCompSupporter cs = {pTableList, posList, pQInfo};
SLoserTreeInfo *pTree = NULL;
tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
SResultRow* pRow = getNewResultRow(pRuntimeEnv->pool);
resetMergeResultBuf(pRuntimeEnv, pRuntimeEnv->pCtx, pRow);
SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQuery->order.order};
pQInfo->groupResInfo.groupId = getMergeResultGroupId(pQInfo->groupIndex);
int32_t ret = tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
if (ret != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
}
// todo add windowRes iterator
int64_t lastTimestamp = -1;
int64_t lastTimestamp = ascQuery? INT64_MIN:INT64_MAX;
int64_t startt = taosGetTimestampMs();
while (1) {
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo);
tfree(pTableList);
tfree(posList);
tfree(pTree);
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
code = TSDB_CODE_TSC_QUERY_CANCELLED;
goto _end;
}
int32_t pos = pTree->pNode[0].index;
SResultRowInfo *pWindowResInfo = &pTableList[pos]->windowResInfo;
SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.position[pos]);
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
int32_t tableIndex = pTree->pNode[0].index;
char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page);
TSKEY ts = GET_INT64_VAL(b);
SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->windowResInfo;
SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]);
assert(ts == pWindowRes->win.skey);
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes);
if (num <= 0) {
cs.position[pos] += 1;
cs.rowIndex[tableIndex] += 1;
if (cs.position[pos] >= pWindowResInfo->size) {
cs.position[pos] = -1;
// all input sources are exhausted
if (--numOfTables == 0) {
if (cs.rowIndex[tableIndex] >= pWindowResInfo->size) {
cs.rowIndex[tableIndex] = -1;
if (--numOfTables == 0) { // all input sources are exhausted
break;
}
}
} else {
if (ts == lastTimestamp) { // merge with the last one
doMerge(pRuntimeEnv, ts, pWindowRes, true);
} else { // copy data to disk buffer
if (buffer[0]->num == pQuery->rec.capacity) {
if (flushFromResultBuf(pRuntimeEnv, &pQInfo->groupResInfo) != TSDB_CODE_SUCCESS) {
return -1;
}
resetMergeResultBuf(pRuntimeEnv, pRuntimeEnv->pCtx, pRow);
}
assert((pWindowRes->win.skey >= lastTimestamp && ascQuery) || (pWindowRes->win.skey <= lastTimestamp && !ascQuery));
doMerge(pRuntimeEnv, ts, pWindowRes, false);
buffer[0]->num += 1;
if (pWindowRes->win.skey != lastTimestamp) {
taosArrayPush(pGroupResInfo->pRows, &pWindowRes);
pWindowRes->numOfRows = (uint32_t) num;
}
lastTimestamp = ts;
lastTimestamp = pWindowRes->win.skey;
// move to the next element of current entry
int32_t currentPageId = pWindowRes->pageId;
cs.position[pos] += 1;
if (cs.position[pos] >= pWindowResInfo->size) {
cs.position[pos] = -1;
// move to the next row of current entry
if ((++cs.rowIndex[tableIndex]) >= pWindowResInfo->size) {
cs.rowIndex[tableIndex] = -1;
// all input sources are exhausted
if (--numOfTables == 0) {
if ((--numOfTables) == 0) {
break;
}
} else {
// current page is not needed anymore
SResultRow *pNextWindowRes = getResultRow(pWindowResInfo, cs.position[pos]);
if (pNextWindowRes->pageId != currentPageId) {
releaseResBufPage(pRuntimeEnv->pResultBuf, page);
}
}
}
tLoserTreeAdjust(pTree, pos + pTree->numOfEntries);
}
if (buffer[0]->num != 0) { // there are data in buffer
if (flushFromResultBuf(pRuntimeEnv, &pQInfo->groupResInfo) != TSDB_CODE_SUCCESS) {
qError("QInfo:%p failed to flush data into temp file, abort query", pQInfo);
tfree(pTree);
tfree(pTableList);
tfree(posList);
return -1;
}
tLoserTreeAdjust(pTree, tableIndex + pTree->numOfEntries);
}
int64_t endt = taosGetTimestampMs();
......@@ -3527,65 +3342,16 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
qDebug("QInfo:%p result merge completed for group:%d, elapsed time:%" PRId64 " ms", pQInfo, pQInfo->groupIndex, endt - startt);
tfree(pTableList);
_end:
tfree(pTableQueryInfoList);
tfree(posList);
tfree(pTree);
// tfree(pResultInfo);
// tfree(buf);
return pQInfo->groupResInfo.numOfDataPages;
}
int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
// the base value for group result, since the maximum number of table for each vnode will not exceed 100,000.
int32_t pageId = -1;
int32_t capacity = pResultBuf->numOfRowsPerPage;
int32_t remain = (int32_t) pQuery->sdata[0]->num;
int32_t offset = 0;
while (remain > 0) {
int32_t rows = (remain > capacity)? capacity:remain;
assert(rows > 0);
// get the output buffer page
tFilePage *buf = getNewDataBuf(pResultBuf, pGroupResInfo->groupId, &pageId);
buf->num = rows;
// pagewisely copy to dest buffer
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
char* output = buf->data + pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage;
char* src = ((char *) pQuery->sdata[i]->data) + offset * bytes;
memcpy(output, src, (size_t)(buf->num * bytes));
}
offset += rows;
remain -= rows;
pGroupResInfo->numOfDataPages += 1;
if (code != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, code);
}
return TSDB_CODE_SUCCESS;
}
void resetMergeResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRow *pRow) {
SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].aOutputBuf = pQuery->sdata[k]->data - pCtx[k].outputBytes;
pCtx[k].size = 1;
pCtx[k].startOffset = 0;
pCtx[k].resultInfo = getResultCell(pRuntimeEnv, pRow, k);
pQuery->sdata[k]->num = 0;
}
return code;
}
static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *pTableQueryInfo) {
......@@ -3706,7 +3472,7 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t tid = 0;
int64_t uid = getResultInfoUId(pRuntimeEnv);
int64_t uid = 0;
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&tid, sizeof(tid), true, uid);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
......@@ -3838,12 +3604,8 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SResultRow *pResult = getResultRow(pWindowResInfo, i);
if (!pResult->closed) {
continue;
}
setResultOutputBuf(pRuntimeEnv, pResult);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int16_t functId = pQuery->pExpr1[j].base.functionId;
if (functId == TSDB_FUNC_TS) {
......@@ -4127,9 +3889,9 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) {
* @param pDataBlockInfo
*/
void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
// lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey;
......@@ -4142,12 +3904,10 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
return;
}
uint64_t uid = getResultInfoUId(pRuntimeEnv);
int64_t uid = 0;
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex,
sizeof(groupIndex), true, uid);
if (pResultRow == NULL) {
return;
}
assert (pResultRow != NULL);
/*
* not assign result buffer yet, add new result buffer
......@@ -4293,7 +4053,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
/**
* In handling the both ascending and descending order super table query, we need to find the first qualified
* timestamp of this table, and then set the first qualified start timestamp.
* In ascending query, key is the first qualified timestamp. However, in the descending order query, additional
* In ascending query, the key is the first qualified timestamp. However, in the descending order query, additional
* operations involve.
*/
STimeWindow w = TSWINDOW_INITIALIZER;
......@@ -4302,7 +4062,6 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
TSKEY sk = MIN(win.skey, win.ekey);
TSKEY ek = MAX(win.skey, win.ekey);
getAlignQueryTimeWindow(pQuery, win.skey, sk, ek, &w);
pWindowResInfo->startTime = pTableQueryInfo->win.skey; // windowSKey may be 0 in case of 1970 timestamp
if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
if (!QUERY_IS_ASC_QUERY(pQuery)) {
......@@ -4341,36 +4100,33 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
return loadPrimaryTS;
}
static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_t orderType) {
static int32_t doCopyToSData(SQInfo *pQInfo, SResultRow **pRows, int32_t numOfRows, int32_t *index, int32_t orderType) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
int32_t numOfResult = 0;
int32_t startIdx = 0;
int32_t start = 0;
int32_t step = -1;
qDebug("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo);
int32_t totalSet = numOfClosedResultRows(pResultInfo);
SResultRow** result = pResultInfo->pResult;
if (orderType == TSDB_ORDER_ASC) {
startIdx = pQInfo->groupIndex;
start = (*index);
step = 1;
} else { // desc order copy all data
startIdx = totalSet - pQInfo->groupIndex - 1;
start = numOfRows - (*index) - 1;
step = -1;
}
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
for (int32_t i = startIdx; (i < totalSet) && (i >= 0); i += step) {
if (result[i]->numOfRows == 0) {
pQInfo->groupIndex += 1;
for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) {
if (pRows[i]->numOfRows == 0) {
(*index) += 1;
pGroupResInfo->rowId = 0;
continue;
}
int32_t numOfRowsToCopy = result[i]->numOfRows - pGroupResInfo->rowId;
int32_t numOfRowsToCopy = pRows[i]->numOfRows - pGroupResInfo->rowId;
int32_t oldOffset = pGroupResInfo->rowId;
/*
......@@ -4382,16 +4138,16 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_
pGroupResInfo->rowId += numOfRowsToCopy;
} else {
pGroupResInfo->rowId = 0;
pQInfo->groupIndex += 1;
(*index) += 1;
}
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, result[i]->pageId);
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pRows[i]->pageId);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t size = pRuntimeEnv->pCtx[j].outputBytes;
char *out = pQuery->sdata[j]->data + numOfResult * size;
char *in = getPosInResultPage(pRuntimeEnv, j, result[i], page);
char *in = getPosInResultPage(pRuntimeEnv, j, pRows[i], page);
memcpy(out, in + oldOffset * size, size * numOfRowsToCopy);
}
......@@ -4422,10 +4178,9 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
int32_t numOfResult = doCopyToSData(pQInfo, pResultInfo, orderType);
int32_t numOfResult = doCopyToSData(pQInfo, pResultInfo->pResult, pResultInfo->size, &pQInfo->groupIndex, orderType);
pQuery->rec.rows += numOfResult;
assert(pQuery->rec.rows <= pQuery->rec.capacity);
}
......@@ -4457,25 +4212,17 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc
SQuery * pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
SResultRowInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
SResultRowInfo * pResultRowInfo = &pTableQueryInfo->windowResInfo;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock);
} else {
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock);
}
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
// TODO refactor
if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) {
closeAllResultRows(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
} else {
updateResultRowCurrentIndex(pWindowResInfo, pTableQueryInfo->lastKey, ascQuery);
}
updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery));
}
}
......@@ -4809,13 +4556,10 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
if (QUERY_IS_ASC_QUERY(pQuery)) {
if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
getAlignQueryTimeWindow(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &w);
pWindowResInfo->startTime = w.skey;
pWindowResInfo->prevSKey = w.skey;
}
} else {
getAlignQueryTimeWindow(pQuery, blockInfo.window.ekey, pQuery->window.ekey, blockInfo.window.ekey, &w);
pWindowResInfo->startTime = pQuery->window.skey;
pWindowResInfo->prevSKey = w.skey;
}
......@@ -5766,6 +5510,7 @@ static void doRestoreContext(SQInfo *pQInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
SWITCH_ORDER(pQuery->order.order);
if (pRuntimeEnv->pTsBuf != NULL) {
SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order);
......@@ -5805,9 +5550,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
*/
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
copyResToQueryResultBuf(pQInfo, pQuery);
#ifdef _DEBUG_VIEW
displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num);
#endif
} else {
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
}
......@@ -5852,7 +5594,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
}
if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) {
if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) {
if (mergeGroupResult(pQInfo) == TSDB_CODE_SUCCESS) {
copyResToQueryResultBuf(pQInfo, pQuery);
#ifdef _DEBUG_VIEW
......@@ -6024,33 +5766,6 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
}
}
static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
SQuery *pQuery = pRuntimeEnv->pQuery;
while (1) {
scanOneTableDataBlocks(pRuntimeEnv, start);
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED));
finalizeQueryResult(pRuntimeEnv);
// here we can ignore the records in case of no interpolation
// todo handle offset, in case of top/bottom interval query
if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL) && pQuery->limit.offset > 0 &&
pQuery->fillType == TSDB_FILL_NONE) {
// maxOutput <= 0, means current query does not generate any results
int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo);
int32_t c = (int32_t)(MIN(numOfClosed, pQuery->limit.offset));
popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, c);
pQuery->limit.offset -= c;
}
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_RESBUF_FULL)) {
break;
}
}
}
// handle time interval query on table
static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv);
......@@ -6058,7 +5773,6 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->current = pTableInfo;
int32_t numOfFilled = 0;
TSKEY newStartKey = TSKEY_INITIAL_VAL;
// skip blocks without load the actual data block from file if no filter condition present
......@@ -6070,59 +5784,39 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
}
}
while (1) {
tableIntervalProcessImpl(pRuntimeEnv, newStartKey);
scanOneTableDataBlocks(pRuntimeEnv, newStartKey);
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED));
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
pQInfo->groupIndex = 0; // always start from 0
pQuery->rec.rows = 0;
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
finalizeQueryResult(pRuntimeEnv);
popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex);
}
// skip offset result rows
pQuery->rec.rows = 0;
// no result generated, abort
if (pQuery->rec.rows == 0 || pRuntimeEnv->groupbyNormalCol) {
break;
if (pQuery->fillType == TSDB_FILL_NONE) {
// all data scanned, the group by normal column can return
int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo);
if (pQuery->limit.offset > numOfClosed) {
return;
}
doSecondaryArithmeticProcess(pQuery);
// the offset is handled at prepare stage if no interpolation involved
if (pQuery->fillType == TSDB_FILL_NONE) {
limitResults(pRuntimeEnv);
break;
} else {
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, (int32_t)pQuery->rec.rows, pQuery->window.ekey);
taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (const tFilePage**) pQuery->sdata);
numOfFilled = 0;
pQInfo->groupIndex = (int32_t) pQuery->limit.offset;
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled);
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
limitResults(pRuntimeEnv);
break;
}
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
doSecondaryArithmeticProcess(pQuery);
// no result generated yet, continue retrieve data
pQuery->rec.rows = 0;
}
}
limitResults(pRuntimeEnv);
} else {
// all data scanned, the group by normal column can return
if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result
// maxOutput <= 0, means current query does not generate any results
int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo);
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
doSecondaryArithmeticProcess(pQuery);
if ((pQuery->limit.offset > 0 && pQuery->limit.offset < numOfClosed) || pQuery->limit.offset == 0) {
// skip offset result rows
popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (int32_t) pQuery->limit.offset);
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, (int32_t)pQuery->rec.rows, pQuery->window.ekey);
taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (const tFilePage **)pQuery->sdata);
pQuery->rec.rows = 0;
pQInfo->groupIndex = 0;
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex);
int32_t numOfFilled = 0;
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled);
doSecondaryArithmeticProcess(pQuery);
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
limitResults(pRuntimeEnv);
}
}
......@@ -6133,7 +5827,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery;
if (queryHasRemainResForTableQuery(pRuntimeEnv)) {
if (pQuery->fillType != TSDB_FILL_NONE) {
/*
* There are remain results that are not returned due to result interpolation
......@@ -6150,23 +5843,23 @@ static void tableQueryImpl(SQInfo *pQInfo) {
return;
} else {
pQuery->rec.rows = 0;
pQInfo->groupIndex = 0; // always start from 0
assert(pRuntimeEnv->windowResInfo.size > 0);
if (pRuntimeEnv->windowResInfo.size > 0) {
if (pQInfo->groupIndex < pRuntimeEnv->windowResInfo.size) {
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex);
if (pQuery->rec.rows > 0) {
qDebug("QInfo:%p %"PRId64" rows returned from group results, total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total);
}
// there are not data remains
if (pRuntimeEnv->windowResInfo.size <= 0) {
qDebug("QInfo:%p query over, %"PRId64" rows are returned", pQInfo, pQuery->rec.total);
}
if (pQuery->rec.rows > 0) {
qDebug("QInfo:%p %" PRId64 " rows returned from group results, total:%" PRId64 "", pQInfo, pQuery->rec.rows,
pQuery->rec.total);
}
return;
}
// there are not data remains
if (pQuery->rec.rows <= 0 || pRuntimeEnv->windowResInfo.size <= pQInfo->groupIndex) {
qDebug("QInfo:%p query over, %" PRId64 " rows are returned", pQInfo, pQuery->rec.total);
}
return;
}
}
......@@ -7284,6 +6977,8 @@ static void freeQInfo(SQInfo *pQInfo) {
tsdbDestroyTableGroup(&pQInfo->tableGroupInfo);
taosHashCleanup(pQInfo->arrTableIdInfo);
taosArrayDestroy(pQInfo->groupResInfo.pRows);
pQInfo->signature = 0;
qDebug("QInfo:%p QInfo is freed", pQInfo);
......@@ -7703,7 +7398,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
(*pRsp)->completed = 1; // notify no more result to client
} else {
*continueExec = true;
qDebug("QInfo:%p has more results waits for client retrieve", pQInfo);
qDebug("QInfo:%p has more results to retrieve", pQInfo);
}
return pQInfo->code;
......@@ -7981,9 +7676,9 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) {
SQueryMgmt* pQueryMgmt = pQMgmt;
qDebug("vgId:%d, set querymgmt closed, wait for all queries cancelled", pQueryMgmt->vgId);
// pthread_mutex_lock(&pQueryMgmt->lock);
pthread_mutex_lock(&pQueryMgmt->lock);
pQueryMgmt->closed = true;
// pthread_mutex_unlock(&pQueryMgmt->lock);
pthread_mutex_unlock(&pQueryMgmt->lock);
taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn);
}
......@@ -8021,9 +7716,9 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
return NULL;
}
// pthread_mutex_lock(&pQueryMgmt->lock);
pthread_mutex_lock(&pQueryMgmt->lock);
if (pQueryMgmt->closed) {
// pthread_mutex_unlock(&pQueryMgmt->lock);
pthread_mutex_unlock(&pQueryMgmt->lock);
qError("QInfo:%p failed to add qhandle into cache, since qMgmt is colsing", (void *)qInfo);
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
......@@ -8031,7 +7726,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo;
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
(getMaximumIdleDurationSec()*1000));
// pthread_mutex_unlock(&pQueryMgmt->lock);
pthread_mutex_unlock(&pQueryMgmt->lock);
return handle;
}
......@@ -8068,6 +7763,4 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) {
taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle);
return 0;
}
}
\ No newline at end of file
......@@ -120,6 +120,7 @@
//}
static int32_t histogramCreateBin(SHistogramInfo* pHisto, int32_t index, double val);
static int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val);
SHistogramInfo* tHistogramCreate(int32_t numOfEntries) {
/* need one redundant slot */
......@@ -158,8 +159,8 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
}
#if defined(USE_ARRAYLIST)
int32_t idx = vnodeHistobinarySearch((*pHisto)->elems, (*pHisto)->numOfEntries, val);
assert(idx >= 0 && idx <= (*pHisto)->maxEntries);
int32_t idx = histoBinarySearch((*pHisto)->elems, (*pHisto)->numOfEntries, val);
assert(idx >= 0 && idx <= (*pHisto)->maxEntries && (*pHisto)->elems != NULL);
if ((*pHisto)->elems[idx].val == val && idx >= 0) {
(*pHisto)->elems[idx].num += 1;
......@@ -356,7 +357,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
return 0;
}
int32_t vnodeHistobinarySearch(SHistBin* pEntry, int32_t len, double val) {
int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val) {
int32_t end = len - 1;
int32_t start = 0;
......@@ -466,7 +467,7 @@ void tHistogramPrint(SHistogramInfo* pHisto) {
*/
int64_t tHistogramSum(SHistogramInfo* pHisto, double v) {
#if defined(USE_ARRAYLIST)
int32_t slotIdx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, v);
int32_t slotIdx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, v);
if (pHisto->elems[slotIdx].val != v) {
slotIdx -= 1;
......
......@@ -96,8 +96,6 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo
pResultRowInfo->curIndex = -1;
pResultRowInfo->size = 0;
pResultRowInfo->startTime = TSKEY_INITIAL_VAL;
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
}
......@@ -110,7 +108,7 @@ void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
assert(num >= 0 && num <= numOfClosed);
int16_t type = pResultRowInfo->type;
int64_t uid = getResultInfoUId(pRuntimeEnv);
int64_t uid = 0;
char *key = NULL;
int16_t bytes = -1;
......@@ -181,11 +179,12 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size);
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
if (pResultRowInfo->pResult[i]->closed) {
SResultRow* pRow = pResultRowInfo->pResult[i];
if (pRow->closed) {
continue;
}
pResultRowInfo->pResult[i]->closed = true;
pRow->closed = true;
}
}
......@@ -383,18 +382,4 @@ void* destroyResultRowPool(SResultRowPool* p) {
tfree(p);
return NULL;
}
uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv) {
if (!pRuntimeEnv->stableQuery) {
return 0; // for simple table query, the uid is always set to be 0;
}
SQuery* pQuery = pRuntimeEnv->pQuery;
if (pQuery->interval.interval == 0 || isPointInterpoQuery(pQuery) || pRuntimeEnv->groupbyNormalCol) {
return 0;
}
STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current->pTable);
return id->uid;
}
\ No newline at end of file
......@@ -31,19 +31,19 @@
extern "C" {
#endif
extern int tsdbDebugFlag;
extern uint32_t tsdbDebugFlag;
#define tsdbFatal(...) { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }}
#define tsdbError(...) { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }}
#define tsdbWarn(...) { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }}
#define tsdbInfo(...) { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }}
#define tsdbDebug(...) { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }}
#define tsdbTrace(...) { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }}
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define TSDB_MAX_TABLE_SCHEMAS 16
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
#define TAOS_IN_RANGE(key, keyMin, keyLast) (((key) >= (keyMin)) && ((key) <= (keyMax)))
......
......@@ -956,9 +956,9 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBl
return code;
}
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pTsCol = pQueryHandle->rhelper.pDataCols[0];
if (pCheckInfo->lastKey < pBlock->keyLast) {
cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
cur->pos = binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
} else {
cur->pos = pBlock->numOfRows - 1;
}
......@@ -1704,7 +1704,32 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
return TSDB_CODE_SUCCESS;
}
static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* exists) {
static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exists);
static int32_t getDataBlockRv(STsdbQueryHandle* pQueryHandle, STableBlockInfo* pNext, bool *exists) {
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SQueryFilePos* cur = &pQueryHandle->cur;
while(1) {
int32_t code = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
if (code != TSDB_CODE_SUCCESS || *exists) {
return code;
}
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists
return getFirstFileDataBlock(pQueryHandle, exists);
} else { // next block of the same file
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
}
}
}
static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exists) {
pQueryHandle->numOfBlocks = 0;
SQueryFilePos* cur = &pQueryHandle->cur;
......@@ -1789,7 +1814,23 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo, exists);
return getDataBlockRv(pQueryHandle, pBlockInfo, exists);
}
static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
assert(cur != NULL && numOfBlocks > 0);
return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
}
static void moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) {
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SQueryFilePos* cur = &pQueryHandle->cur;
assert(cur->slot < pQueryHandle->numOfBlocks && cur->slot >= 0);
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
}
static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
......@@ -1800,14 +1841,14 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
if (!pQueryHandle->locateStart) {
pQueryHandle->locateStart = true;
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
return getDataBlocksInFilesImpl(pQueryHandle, exists);
return getFirstFileDataBlock(pQueryHandle, exists);
} else {
// check if current file block is all consumed
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
......@@ -1815,27 +1856,26 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
// current block is done, try next
if ((!cur->mixBlock) || cur->blockCompleted) {
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists
return getDataBlocksInFilesImpl(pQueryHandle, exists);
} else {
// next block of the same file
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
}
// all data blocks in current file has been checked already, try next file if exists
} else {
tsdbDebug("%p continue in current data block, index:%d, pos:%d, %p", pQueryHandle, cur->slot, cur->pos, pQueryHandle->qinfo);
tsdbDebug("%p continue in current data block, index:%d, pos:%d, %p", pQueryHandle, cur->slot, cur->pos,
pQueryHandle->qinfo);
int32_t code = handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
*exists = pQueryHandle->realNumOfRows > 0;
*exists = (pQueryHandle->realNumOfRows > 0);
return code;
if (code != TSDB_CODE_SUCCESS || *exists) {
return code;
}
}
// current block is empty, try next block in file
// all data blocks in current file has been checked already, try next file if exists
if (isEndFileDataBlock(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) {
return getFirstFileDataBlock(pQueryHandle, exists);
} else {
moveToNextDataBlockInCurrentFile(pQueryHandle);
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
return getDataBlockRv(pQueryHandle, pNext, exists);
}
}
}
......
......@@ -52,6 +52,8 @@ void *taosIterateRef(int rsetId, int64_t rid);
// return the number of references in system
int taosListRef();
#define RID_VALID(x) ((x) > 0)
/* sample code to iterate the refs
void demoIterateRefs(int rsetId) {
......
......@@ -547,13 +547,14 @@ void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
return;
}
__cache_wr_lock(pCacheObj);
STrashElem *pElem = calloc(1, sizeof(STrashElem));
pElem->pData = pNode;
pElem->prev = NULL;
pElem->prev = NULL;
pElem->next = NULL;
pNode->inTrashcan = true;
pNode->pTNodeHeader = pElem;
__cache_wr_lock(pCacheObj);
pElem->next = pCacheObj->pTrash;
if (pCacheObj->pTrash) {
pCacheObj->pTrash->prev = pElem;
......@@ -563,8 +564,8 @@ void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
pCacheObj->numOfElemsInTrash++;
__cache_unlock(pCacheObj);
uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name,
pNode->key, pNode->data, pElem, pCacheObj->numOfElemsInTrash);
uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name, pNode->key,
pNode->data, pElem, pCacheObj->numOfElemsInTrash);
}
void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
......
......@@ -280,10 +280,13 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
tSkipListRLock(pSkipList);
if (iter->order == TSDB_ORDER_ASC) {
if (iter->cur == pSkipList->pTail) {
// no data in the skip list
if (iter->cur == pSkipList->pTail || iter->next == NULL) {
iter->cur = pSkipList->pTail;
tSkipListUnlock(pSkipList);
return false;
}
iter->cur = SL_NODE_GET_FORWARD_POINTER(iter->cur, 0);
// a new node is inserted into between iter->cur and iter->next, ignore it
......@@ -295,9 +298,11 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
iter->step++;
} else {
if (iter->cur == pSkipList->pHead) {
iter->cur = pSkipList->pHead;
tSkipListUnlock(pSkipList);
return false;
}
iter->cur = SL_NODE_GET_BACKWARD_POINTER(iter->cur, 0);
// a new node is inserted into between iter->cur and iter->next, ignore it
......
......@@ -307,8 +307,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
code = TSDB_CODE_QRY_HAS_RSP;
} else {
void *h1 = qGetResultRetrieveMsg(*qhandle);
assert(h1 == NULL);
//void *h1 = qGetResultRetrieveMsg(*qhandle);
/* remove this assert, one possible case that will cause h1 not NULL: query thread unlock pQInfo->lock, and then FETCH thread execute twice before query thread reach here */
//assert(h1 == NULL);
freehandle = qQueryCompleted(*qhandle);
}
......
......@@ -50,12 +50,7 @@ pipeline {
agent{label 'master'}
steps {
pre_test()
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
python3 concurrent_inquiry.py -c 1
'''
}
sh '''
cd ${WKC}/tests
./test-all.sh b1
......@@ -82,53 +77,26 @@ pipeline {
./handle_crash_gen_val_log.sh
'''
}
sh '''
cd ${WKC}/tests
./test-all.sh b2
date
'''
}
}
stage('test_valgrind') {
agent{label "186"}
steps {
pre_test()
sh '''
cd ${WKC}/tests/pytest
./valgrind-test.sh 2>&1 > mem-error-out.log
./handle_val_log.sh
date
cd ${WKC}/tests
./test-all.sh b3
date'''
}
}
stage('connector'){
agent{label "release"}
steps{
sh'''
cd ${WORKSPACE}
git checkout develop
systemctl start taosd
sleep 10
'''
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WORKSPACE}/tests/gotest
cd ${WKC}/tests/gotest
bash batchtest.sh
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WORKSPACE}/tests/examples/python/PYTHONConnectorChecker
cd ${WKC}/tests/examples/python/PYTHONConnectorChecker
python3 PythonChecker.py
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WORKSPACE}/tests/examples/JDBC/JDBCDemo/
mvn clean package assembly:single >/dev/null
cd ${WKC}/tests/examples/JDBC/JDBCDemo/
mvn clean package assembly:single -DskipTests >/dev/null
java -jar target/jdbcChecker-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
'''
}
......@@ -138,9 +106,41 @@ pipeline {
dotnet run
'''
}
sh '''
systemctl stop taosd
cd ${WKC}/tests
./test-all.sh b2
date
'''
}
}
stage('test_valgrind') {
agent{label "186"}
steps {
pre_test()
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
nohup taosd >/dev/null &
sleep 10
python3 concurrent_inquiry.py -c 1
'''
}
sh '''
cd ${WKC}/tests/pytest
./valgrind-test.sh 2>&1 > mem-error-out.log
./handle_val_log.sh
}
}
date
cd ${WKC}/tests
./test-all.sh b3
date'''
}
}
stage('arm64_build'){
agent{label 'arm64'}
steps{
......
......@@ -388,7 +388,9 @@ class ConcurrentInquiry:
print(
"Failure thread%d, sql: %s \nexception: %s" %
(threadID, str(sql),str(e)))
#exit(-1)
err_uec='Unable to establish connection'
if err_uec in str(e) and loop >0:
exit(-1)
loop -= 1
if loop == 0: break
......@@ -415,7 +417,9 @@ class ConcurrentInquiry:
print(
"Failure thread%d, sql: %s \nexception: %s" %
(threadID, str(sql),str(e)))
#exit(-1)
err_uec='Unable to establish connection'
if err_uec in str(e) and loop >0:
exit(-1)
loop -= 1
if loop == 0: break
......
......@@ -5,9 +5,10 @@ GREEN='\033[1;32m'
GREEN_DARK='\033[0;32m'
GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m'
nohup /var/lib/jenkins/workspace/TDinternal/debug/build/bin/taosd -c /var/lib/jenkins/workspace/TDinternal/community/sim/dnode1/cfg >/dev/null &
#nohup /var/lib/jenkins/workspace/TDinternal/debug/build/bin/taosd -c /var/lib/jenkins/workspace/TDinternal/community/sim/dnode1/cfg >/dev/null &
nohup /root/TDinternal/debug/build/bin/taosd -c /root/TDinternal/community/sim/dnode1/cfg >/dev/null &
./crash_gen.sh --valgrind -p -t 10 -s 250 -b 4
pidof taosd|xargs kill
pidof taosd|xargs kill -9
grep 'start to execute\|ERROR SUMMARY' valgrind.err|grep -v 'grep'|uniq|tee crash_gen_mem_err.log
for memError in `grep 'ERROR SUMMARY' crash_gen_mem_err.log | awk '{print $4}'`
......@@ -31,4 +32,4 @@ if [ -n "$defiMemError" ]; then
exit 8
fi
fi
done
\ No newline at end of file
done
......@@ -43,7 +43,8 @@ class TDTestCase:
print("==============step2")
tdDnodes.stopAll()
filename = '/var/lib/taos/mnode/wal/wal0'
path = tdDnodes.getDnodesRootDir()
filename = path + '/dnode1/data/mnode/wal/wal0'
with open(filename, 'rb') as f1:
temp = f1.read()
......@@ -57,12 +58,12 @@ class TDTestCase:
print("==============step3")
tdSql.execute("use demo;")
tdSql.query('show tables;')
tdSql.checkRows(10)
tdSql.checkRows(9)
for i in range(11,21):
tdSql.execute("CREATE table if not exists test{num} using meters tags({num});".format(num=i))
tdSql.query('show tables;')
tdSql.checkRows(20)
tdSql.checkRows(19)
print("==============check table numbers and create 10 tables")
......
......@@ -144,4 +144,20 @@ if $data03 != 319 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
print ===================> TD-2488
sql create table m1(ts timestamp, k int) tags(a int);
sql create table t1 using m1 tags(1);
sql create table t2 using m1 tags(2);
sql insert into t1 values('2020-1-1 1:1:1', 1);
sql insert into t1 values('2020-1-1 1:10:1', 2);
sql insert into t2 values('2020-1-1 1:5:1', 99);
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 3000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql select ts from m1 where ts='2020-1-1 1:5:1'
if $rows != 1 then
return -1
endi
\ No newline at end of file
......@@ -31,11 +31,17 @@ function runSimCaseOneByOnefq {
case=`echo $line | grep sim$ |awk '{print $NF}'`
start_time=`date +%s`
./test.sh -f $case > /dev/null 2>&1 && \
IN_TDINTERNAL="community"
if [[ "$tests_dir" == *"$IN_TDINTERNAL"* ]]; then
./test.sh -f $case > /dev/null 2>&1 && \
echo -e "${GREEN}$case success${NC}" | tee -a out.log || \
( grep 'script.*success.*m$' ../../../sim/tsim/log/taoslog0.0 && echo -e "${GREEN}$case success${NC}" | tee -a out.log ) || echo -e "${RED}$case failed${NC}" | tee -a out.log
else
./test.sh -f $case > /dev/null 2>&1 && \
echo -e "${GREEN}$case success${NC}" | tee -a out.log || \
( grep 'script.*success.*m$' ../../sim/tsim/log/taoslog0.0 && echo -e "${GREEN}$case success${NC}" | tee -a out.log ) || echo -e "${RED}$case failed${NC}" | tee -a out.log
fi
out_log=`tail -1 out.log `
if [[ $out_log =~ 'failed' ]];then
exit 8
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册