未验证 提交 cca70151 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #4681 from taosdata/feature/query

Feature/query
......@@ -388,10 +388,10 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
return;
}
assert(pSql->res.code != TSDB_CODE_SUCCESS);
tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
SSqlRes *pRes = &pSql->res;
if (pSql->fp == NULL || pSql->fetchFp == NULL){
return;
}
......
......@@ -2597,14 +2597,23 @@ static void percentile_next_step(SQLFunctionCtx *pCtx) {
}
//////////////////////////////////////////////////////////////////////////////////
static void buildHistogramInfo(SAPercentileInfo* pInfo) {
pInfo->pHisto = (SHistogramInfo*) ((char*) pInfo + sizeof(SAPercentileInfo));
pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo));
}
static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo* pInfo = NULL;
if (pCtx->stableQuery && pCtx->currentStage != SECONDARY_STAGE_MERGE) {
return (SAPercentileInfo*) pCtx->aOutputBuf;
pInfo = (SAPercentileInfo*) pCtx->aOutputBuf;
} else {
return GET_ROWCELL_INTERBUF(pResInfo);
pInfo = GET_ROWCELL_INTERBUF(pResInfo);
}
buildHistogramInfo(pInfo);
return pInfo;
}
static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
......@@ -2616,6 +2625,7 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
printf("%p, %p\n", pInfo->pHisto, pInfo->pHisto->elems);
return true;
}
......@@ -2624,6 +2634,8 @@ static void apercentile_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo *pInfo = getAPerctInfo(pCtx);
assert(pInfo->pHisto->elems != NULL);
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_CHAR_INDEX(pCtx, i);
......
......@@ -221,7 +221,7 @@ int32_t uDebugFlag = 131;
int32_t debugFlag = 0;
int32_t sDebugFlag = 135;
int32_t wDebugFlag = 135;
int32_t tsdbDebugFlag = 131;
uint32_t tsdbDebugFlag = 131;
int32_t cqDebugFlag = 131;
int32_t (*monStartSystemFp)() = NULL;
......
Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df
Subproject commit ec77d9049a719dabfd1a7c1122a209e201861944
......@@ -349,11 +349,12 @@ CTaosInterface.prototype.useResult = function useResult(result) {
return fields;
}
CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
let pblock = ref.ref(ref.ref(ref.NULL)); // equal to our raw data
let num_of_rows = this.libtaos.taos_fetch_block(result, pblock)
if (num_of_rows == 0) {
//let pblock = ref.ref(ref.ref(ref.NULL)); // equal to our raw data
let pblock = this.libtaos.taos_fetch_row(result);
if (pblock == null) {
return {block:null, num_of_rows:0};
}
var fieldL = this.libtaos.taos_fetch_lengths(result);
let isMicro = (this.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO);
......@@ -361,7 +362,6 @@ CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
var fieldlens = [];
if (ref.isNull(fieldL) == false) {
for (let i = 0; i < fields.length; i ++) {
let plen = ref.reinterpret(fieldL, 4, i*4);
let len = plen.readInt32LE(0);
......
......@@ -28,7 +28,7 @@ extern "C" {
default: \
(_v) = (_finalType)GET_INT32_VAL(_data); \
break; \
};
}
#ifdef __cplusplus
}
......
......@@ -33,13 +33,6 @@ struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
typedef struct SGroupResInfo {
int32_t groupId;
int32_t numOfDataPages;
int32_t pageId;
int32_t rowId;
} SGroupResInfo;
typedef struct SResultRowPool {
int32_t elemSize;
int32_t blockSize;
......@@ -72,6 +65,12 @@ typedef struct SResultRow {
union {STimeWindow win; char* key;}; // start key of current time window
} SResultRow;
typedef struct SGroupResInfo {
int32_t rowId;
int32_t index;
SArray* pRows; // SArray<SResultRow*>
} SGroupResInfo;
/**
* If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate.
......@@ -89,7 +88,6 @@ typedef struct SResultRowInfo {
int32_t size:24; // number of result set
int32_t capacity; // max capacity
int32_t curIndex; // current start active index
int64_t startTime; // start time of the first time window for sliding query
int64_t prevSKey; // previous (not completed) sliding window start key
} SResultRowInfo;
......
......@@ -67,7 +67,7 @@ void tHistogramDestroy(SHistogramInfo** pHisto);
void tHistogramPrint(SHistogramInfo* pHisto);
int32_t vnodeHistobinarySearch(SHistBin* pEntry, int32_t len, double val);
//int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val);
SHeapEntry* tHeapCreate(int32_t numOfEntries);
void tHeapSort(SHeapEntry* pEntry, int32_t len);
......
......@@ -77,7 +77,6 @@ void* destroyResultRowPool(SResultRowPool* p);
int32_t getNumOfAllocatedResultRows(SResultRowPool* p);
int32_t getNumOfUsedResultRows(SResultRowPool* p);
uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv);
bool isPointInterpoQuery(SQuery *pQuery);
......
此差异已折叠。
......@@ -120,6 +120,7 @@
//}
static int32_t histogramCreateBin(SHistogramInfo* pHisto, int32_t index, double val);
static int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val);
SHistogramInfo* tHistogramCreate(int32_t numOfEntries) {
/* need one redundant slot */
......@@ -158,8 +159,8 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
}
#if defined(USE_ARRAYLIST)
int32_t idx = vnodeHistobinarySearch((*pHisto)->elems, (*pHisto)->numOfEntries, val);
assert(idx >= 0 && idx <= (*pHisto)->maxEntries);
int32_t idx = histoBinarySearch((*pHisto)->elems, (*pHisto)->numOfEntries, val);
assert(idx >= 0 && idx <= (*pHisto)->maxEntries && (*pHisto)->elems != NULL);
if ((*pHisto)->elems[idx].val == val && idx >= 0) {
(*pHisto)->elems[idx].num += 1;
......@@ -356,7 +357,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
return 0;
}
int32_t vnodeHistobinarySearch(SHistBin* pEntry, int32_t len, double val) {
int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val) {
int32_t end = len - 1;
int32_t start = 0;
......@@ -466,7 +467,7 @@ void tHistogramPrint(SHistogramInfo* pHisto) {
*/
int64_t tHistogramSum(SHistogramInfo* pHisto, double v) {
#if defined(USE_ARRAYLIST)
int32_t slotIdx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, v);
int32_t slotIdx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, v);
if (pHisto->elems[slotIdx].val != v) {
slotIdx -= 1;
......
......@@ -96,8 +96,6 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo
pResultRowInfo->curIndex = -1;
pResultRowInfo->size = 0;
pResultRowInfo->startTime = TSKEY_INITIAL_VAL;
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
}
......@@ -110,7 +108,7 @@ void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
assert(num >= 0 && num <= numOfClosed);
int16_t type = pResultRowInfo->type;
int64_t uid = getResultInfoUId(pRuntimeEnv);
int64_t uid = 0;
char *key = NULL;
int16_t bytes = -1;
......@@ -181,11 +179,12 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size);
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
if (pResultRowInfo->pResult[i]->closed) {
SResultRow* pRow = pResultRowInfo->pResult[i];
if (pRow->closed) {
continue;
}
pResultRowInfo->pResult[i]->closed = true;
pRow->closed = true;
}
}
......@@ -383,18 +382,4 @@ void* destroyResultRowPool(SResultRowPool* p) {
tfree(p);
return NULL;
}
uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv) {
if (!pRuntimeEnv->stableQuery) {
return 0; // for simple table query, the uid is always set to be 0;
}
SQuery* pQuery = pRuntimeEnv->pQuery;
if (pQuery->interval.interval == 0 || isPointInterpoQuery(pQuery) || pRuntimeEnv->groupbyNormalCol) {
return 0;
}
STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current->pTable);
return id->uid;
}
\ No newline at end of file
......@@ -31,19 +31,19 @@
extern "C" {
#endif
extern int tsdbDebugFlag;
extern uint32_t tsdbDebugFlag;
#define tsdbFatal(...) { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }}
#define tsdbError(...) { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }}
#define tsdbWarn(...) { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }}
#define tsdbInfo(...) { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }}
#define tsdbDebug(...) { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }}
#define tsdbTrace(...) { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }}
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define TSDB_MAX_TABLE_SCHEMAS 16
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
#define TAOS_IN_RANGE(key, keyMin, keyLast) (((key) >= (keyMin)) && ((key) <= (keyMax)))
......
......@@ -956,9 +956,9 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBl
return code;
}
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pTsCol = pQueryHandle->rhelper.pDataCols[0];
if (pCheckInfo->lastKey < pBlock->keyLast) {
cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
cur->pos = binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
} else {
cur->pos = pBlock->numOfRows - 1;
}
......@@ -1704,7 +1704,32 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
return TSDB_CODE_SUCCESS;
}
static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* exists) {
static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exists);
static int32_t getDataBlockRv(STsdbQueryHandle* pQueryHandle, STableBlockInfo* pNext, bool *exists) {
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SQueryFilePos* cur = &pQueryHandle->cur;
while(1) {
int32_t code = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
if (code != TSDB_CODE_SUCCESS || *exists) {
return code;
}
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists
return getFirstFileDataBlock(pQueryHandle, exists);
} else { // next block of the same file
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
}
}
}
static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exists) {
pQueryHandle->numOfBlocks = 0;
SQueryFilePos* cur = &pQueryHandle->cur;
......@@ -1789,7 +1814,23 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo, exists);
return getDataBlockRv(pQueryHandle, pBlockInfo, exists);
}
static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
assert(cur != NULL && numOfBlocks > 0);
return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
}
static void moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) {
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SQueryFilePos* cur = &pQueryHandle->cur;
assert(cur->slot < pQueryHandle->numOfBlocks && cur->slot >= 0);
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
}
static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
......@@ -1800,14 +1841,14 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
if (!pQueryHandle->locateStart) {
pQueryHandle->locateStart = true;
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
return getDataBlocksInFilesImpl(pQueryHandle, exists);
return getFirstFileDataBlock(pQueryHandle, exists);
} else {
// check if current file block is all consumed
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
......@@ -1815,27 +1856,26 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
// current block is done, try next
if ((!cur->mixBlock) || cur->blockCompleted) {
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists
return getDataBlocksInFilesImpl(pQueryHandle, exists);
} else {
// next block of the same file
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
}
// all data blocks in current file has been checked already, try next file if exists
} else {
tsdbDebug("%p continue in current data block, index:%d, pos:%d, %p", pQueryHandle, cur->slot, cur->pos, pQueryHandle->qinfo);
tsdbDebug("%p continue in current data block, index:%d, pos:%d, %p", pQueryHandle, cur->slot, cur->pos,
pQueryHandle->qinfo);
int32_t code = handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
*exists = pQueryHandle->realNumOfRows > 0;
*exists = (pQueryHandle->realNumOfRows > 0);
return code;
if (code != TSDB_CODE_SUCCESS || *exists) {
return code;
}
}
// current block is empty, try next block in file
// all data blocks in current file has been checked already, try next file if exists
if (isEndFileDataBlock(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) {
return getFirstFileDataBlock(pQueryHandle, exists);
} else {
moveToNextDataBlockInCurrentFile(pQueryHandle);
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
return getDataBlockRv(pQueryHandle, pNext, exists);
}
}
}
......
......@@ -547,13 +547,14 @@ void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
return;
}
__cache_wr_lock(pCacheObj);
STrashElem *pElem = calloc(1, sizeof(STrashElem));
pElem->pData = pNode;
pElem->prev = NULL;
pElem->prev = NULL;
pElem->next = NULL;
pNode->inTrashcan = true;
pNode->pTNodeHeader = pElem;
__cache_wr_lock(pCacheObj);
pElem->next = pCacheObj->pTrash;
if (pCacheObj->pTrash) {
pCacheObj->pTrash->prev = pElem;
......@@ -563,8 +564,8 @@ void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
pCacheObj->numOfElemsInTrash++;
__cache_unlock(pCacheObj);
uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name,
pNode->key, pNode->data, pElem, pCacheObj->numOfElemsInTrash);
uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name, pNode->key,
pNode->data, pElem, pCacheObj->numOfElemsInTrash);
}
void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
......
......@@ -280,10 +280,13 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
tSkipListRLock(pSkipList);
if (iter->order == TSDB_ORDER_ASC) {
if (iter->cur == pSkipList->pTail) {
// no data in the skip list
if (iter->cur == pSkipList->pTail || iter->next == NULL) {
iter->cur = pSkipList->pTail;
tSkipListUnlock(pSkipList);
return false;
}
iter->cur = SL_NODE_GET_FORWARD_POINTER(iter->cur, 0);
// a new node is inserted into between iter->cur and iter->next, ignore it
......@@ -295,9 +298,11 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
iter->step++;
} else {
if (iter->cur == pSkipList->pHead) {
iter->cur = pSkipList->pHead;
tSkipListUnlock(pSkipList);
return false;
}
iter->cur = SL_NODE_GET_BACKWARD_POINTER(iter->cur, 0);
// a new node is inserted into between iter->cur and iter->next, ignore it
......
......@@ -144,4 +144,20 @@ if $data03 != 319 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
print ===================> TD-2488
sql create table m1(ts timestamp, k int) tags(a int);
sql create table t1 using m1 tags(1);
sql create table t2 using m1 tags(2);
sql insert into t1 values('2020-1-1 1:1:1', 1);
sql insert into t1 values('2020-1-1 1:10:1', 2);
sql insert into t2 values('2020-1-1 1:5:1', 99);
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 3000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql select ts from m1 where ts='2020-1-1 1:5:1'
if $rows != 1 then
return -1
endi
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册