提交 ca859544 编写于 作者: H hjxilinx

[TD-32] 1. add tsdb query processing functions. 2. refactor connect to employ...

[TD-32] 1. add tsdb query processing functions. 2. refactor connect to employ async way. 3. change some variables' name
上级 cbe666b2
......@@ -296,5 +296,6 @@ ENDIF ()
ADD_SUBDIRECTORY(deps)
ADD_SUBDIRECTORY(src)
ADD_SUBDIRECTORY(tests)
INCLUDE(CPack)
......@@ -20,8 +20,8 @@
extern "C" {
#endif
#include <tarray.h>
#include "os.h"
#include "qsqlparser.h"
#include "qsqltype.h"
#include "qtsbuf.h"
......@@ -33,6 +33,7 @@ extern "C" {
#include "trpc.h"
#include "tsqlfunction.h"
#include "tutil.h"
#include "tarray.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows)
......@@ -83,7 +84,7 @@ typedef struct STableMetaInfo {
/* the structure for sql function in select clause */
typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN + 1]; // as aliasName
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
SColIndexEx colInfo;
int64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array
......@@ -104,7 +105,6 @@ typedef struct SFieldInfo {
int16_t numOfOutputCols; // number of column in result
int16_t numOfAlloc; // allocated size
TAOS_FIELD *pFields;
// short * pOffset;
/*
* define if this column is belong to the queried result, it may be add by parser to faciliate
......@@ -398,7 +398,7 @@ int32_t tscInitRpc(const char *user, const char *secret);
// tscSql API
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
void tscInitMsgs();
void tscInitMsgsFp();
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg);
......
此差异已折叠。
......@@ -364,7 +364,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
return TSDB_CODE_SUCCESS;
}
static void freeSubqueryObj(SSqlObj* pSql) {
void freeSubqueryObj(SSqlObj* pSql) {
SSubqueryState* pState = NULL;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
......
......@@ -1201,7 +1201,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
pFuncExpr->interResBytes = sizeof(double);
pFuncExpr->resType = TSDB_DATA_TYPE_DOUBLE;
SSqlBinaryExprInfo* pBinExprInfo = &pFuncExpr->pBinExprInfo;
SSqlBinaryExprInfo* pBinExprInfo = &pFuncExpr->binExprInfo;
tSQLSyntaxNode* pNode = NULL;
SColIndexEx* pColIndex = NULL;
......
......@@ -29,9 +29,6 @@
#define TSC_MGMT_VNODE 999
int tsMasterIndex = 0;
int tsSlaveIndex = 1;
SRpcIpSet tscMgmtIpList;
SRpcIpSet tscDnodeIpSet;
......@@ -277,8 +274,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
pSql->retry = 0;
if (pSql->fp == NULL) tsem_wait(&pSql->emptyRspSem);
pRes->rspLen = 0;
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
......@@ -327,43 +322,39 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
}
}
if (pSql->fp == NULL) {
tsem_post(&pSql->rspSem);
} else {
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
int command = pCmd->command;
void *taosres = tscKeepConn[command] ? pSql : NULL;
rpcMsg->code = pRes->code ? -pRes->code : pRes->numOfRows;
if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
int command = pCmd->command;
void *taosres = tscKeepConn[command] ? pSql : NULL;
rpcMsg->code = pRes->code ? -pRes->code : pRes->numOfRows;
tscTrace("%p Async SQL result:%d res:%p", pSql, rpcMsg->code, taosres);
tscTrace("%p Async SQL result:%d res:%p", pSql, rpcMsg->code, taosres);
/*
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
* may be freed in UDF, and reused by other threads before tscShouldFreeAsyncSqlObj called, in which case
* tscShouldFreeAsyncSqlObj checks an object which is actually allocated by other threads.
*
* If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
* the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately.
*/
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
if (command == TSDB_SQL_INSERT) { // handle multi-vnode insertion situation
(*pSql->fp)(pSql, taosres, rpcMsg->code);
} else {
(*pSql->fp)(pSql->param, taosres, rpcMsg->code);
}
/*
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
* may be freed in UDF, and reused by other threads before tscShouldFreeAsyncSqlObj called, in which case
* tscShouldFreeAsyncSqlObj checks an object which is actually allocated by other threads.
*
* If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
* the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately.
*/
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
if (command == TSDB_SQL_INSERT) { // handle multi-vnode insertion situation
(*pSql->fp)(pSql, taosres, rpcMsg->code);
} else {
(*pSql->fp)(pSql->param, taosres, rpcMsg->code);
}
if (shouldFree) {
// If it is failed, all objects allocated during execution taos_connect_a should be released
if (command == TSDB_SQL_CONNECT) {
taos_close(pObj);
tscTrace("%p Async sql close failed connection", pSql);
} else {
tscFreeSqlObj(pSql);
tscTrace("%p Async sql is automatically freed", pSql);
}
if (shouldFree) {
// If it is failed, all objects allocated during execution taos_connect_a should be released
if (command == TSDB_SQL_CONNECT) {
taos_close(pObj);
tscTrace("%p Async sql close failed connection", pSql);
} else {
tscFreeSqlObj(pSql);
tscTrace("%p Async sql is automatically freed", pSql);
}
}
}
......@@ -879,25 +870,14 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
tfree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql);
// sync query, wait for the master SSqlObj to proceed
if (pPObj->fp == NULL) {
// sync query, wait for the master SSqlObj to proceed
tsem_wait(&pPObj->emptyRspSem);
tsem_wait(&pPObj->emptyRspSem);
tsem_post(&pPObj->rspSem);
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
} else {
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
(*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code);
} else { // regular super table query
if (pPObj->res.code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pPObj);
}
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
(*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code);
} else { // regular super table query
if (pPObj->res.code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pPObj);
}
}
}
......@@ -1034,22 +1014,14 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
// only free once
tfree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql);
if (pPObj->fp == NULL) {
tsem_wait(&pPObj->emptyRspSem);
tsem_wait(&pPObj->emptyRspSem);
tsem_post(&pPObj->rspSem);
// set the command flag must be after the semaphore been correctly set.
pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
if (pPObj->res.code == TSDB_CODE_SUCCESS) {
(*pPObj->fp)(pPObj->param, pPObj, 0);
} else {
// set the command flag must be after the semaphore been correctly set.
pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
if (pPObj->res.code == TSDB_CODE_SUCCESS) {
(*pPObj->fp)(pPObj->param, pPObj, 0);
} else {
tscQueueAsyncRes(pPObj);
}
tscQueueAsyncRes(pPObj);
}
}
}
......@@ -3186,9 +3158,6 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// enforce the renew metermeta operation in async model
if (pSql->fp == NULL) pSql->fp = (void *)0x1;
/*
* 1. only update the metermeta in force model metricmeta is not updated
* 2. if get metermeta failed, still get the metermeta
......@@ -3292,42 +3261,17 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
// }
tscTrace("%p allocate new pSqlObj:%p to get metricMeta", pSql, pNew);
if (pSql->fp == NULL) {
tsem_init(&pNew->rspSem, 0, 0);
tsem_init(&pNew->emptyRspSem, 0, 1);
code = tscProcessSql(pNew);
if (code == TSDB_CODE_SUCCESS) {//todo optimize the performance
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
char tagstr[TSDB_MAX_TAGS_LEN] = {0};
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pTableMetaInfo->pTableMeta->uid);
#ifdef _DEBUG_VIEW
printf("create metric key:%s, index:%d\n", tagstr, i);
#endif
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
pTableMetaInfo->pMetricMeta = (SSuperTableMeta *) taosCacheAcquireByName(tscCacheHandle, tagstr);
}
}
tscFreeSqlObj(pNew);
} else {
pNew->fp = tscTableMetaCallBack;
pNew->param = pSql;
code = tscProcessSql(pNew);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
pNew->fp = tscTableMetaCallBack;
pNew->param = pSql;
code = tscProcessSql(pNew);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
return code;
}
void tscInitMsgs() {
void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;
......
......@@ -31,33 +31,39 @@
#include "ttokendef.h"
#include "qast.h"
TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port,
void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) {
STscObj *pObj;
static bool validImpl(const char* str, size_t maxsize) {
if (str == NULL) {
return false;
}
size_t len = strlen(str);
if (len <= 0 || len > maxsize) {
return false;
}
return true;
}
static bool validUserName(const char* user) {
return validImpl(user, TSDB_USER_LEN);
}
taos_init();
static bool validPassword(const char* passwd) {
return validImpl(passwd, TSDB_PASSWORD_LEN);
}
if (user == NULL) {
STscObj *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port,
void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) {
taos_init();
if (!validUserName(user)) {
globalCode = TSDB_CODE_INVALID_ACCT;
return NULL;
} else {
size_t len = strlen(user);
if (len <= 0 || len > TSDB_USER_LEN) {
globalCode = TSDB_CODE_INVALID_ACCT;
return NULL;
}
}
if (pass == NULL) {
if (!validPassword(pass)) {
globalCode = TSDB_CODE_INVALID_PASS;
return NULL;
} else {
size_t len = strlen(pass);
if (len <= 0 || len > TSDB_KEY_LEN) {
globalCode = TSDB_CODE_INVALID_PASS;
return NULL;
}
}
if (tscInitRpc(user, pass) != 0) {
......@@ -83,14 +89,13 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
}
tscMgmtIpList.port = port ? port : tsMnodeShellPort;
pObj = (STscObj *)malloc(sizeof(STscObj));
STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj));
if (NULL == pObj) {
globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY;
return NULL;
}
memset(pObj, 0, sizeof(STscObj));
pObj->signature = pObj;
strncpy(pObj->user, user, TSDB_USER_LEN);
......@@ -115,18 +120,17 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
pthread_mutex_init(&pObj->mutex, NULL);
SSqlObj *pSql = (SSqlObj *)malloc(sizeof(SSqlObj));
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (NULL == pSql) {
globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY;
free(pObj);
return NULL;
}
memset(pSql, 0, sizeof(SSqlObj));
pSql->pTscObj = pObj;
pSql->signature = pSql;
tsem_init(&pSql->rspSem, 0, 0);
tsem_init(&pSql->emptyRspSem, 0, 1);
// tsem_init(&pSql->emptyRspSem, 0, 1);
pObj->pSql = pSql;
pSql->fp = fp;
pSql->param = param;
......@@ -142,46 +146,69 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
return NULL;
}
pSql->res.code = tscProcessSql(pSql);
if (fp != NULL) {
tscTrace("%p DB async connection is opening", pObj);
return pObj;
}
if (pSql->res.code) {
taos_close(pObj);
return NULL;
}
tscTrace("%p DB connection is opened", pObj);
return pObj;
}
static void syncConnCallback(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param;
assert(pObj != NULL && pObj->pSql != NULL);
sem_post(&pObj->pSql->rspSem);
}
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
if (ip == NULL || (ip != NULL && (strcmp("127.0.0.1", ip) == 0 || strcasecmp("localhost", ip) == 0))) {
ip = tsMasterIp;
}
tscTrace("try to create a connection to %s", ip);
void *taos = taos_connect_imp(ip, user, pass, db, port, NULL, NULL, NULL);
if (taos != NULL) {
STscObj *pObj = (STscObj *)taos;
STscObj *pObj = taos_connect_imp(ip, user, pass, db, port, NULL, NULL, NULL);
if (pObj != NULL) {
SSqlObj* pSql = pObj->pSql;
assert(pSql != NULL);
pSql->fp = syncConnCallback;
pSql->param = pObj;
tscProcessSql(pSql);
sem_wait(&pSql->rspSem);
if (pSql->res.code != TSDB_CODE_SUCCESS) {
taos_close(pObj);
return NULL;
}
tscTrace("%p DB connection is opening", pObj);
// version compare only requires the first 3 segments of the version string
int code = taosCheckVersion(version, taos_get_server_info(taos), 3);
int code = taosCheckVersion(version, taos_get_server_info(pObj), 3);
if (code != 0) {
pObj->pSql->res.code = code;
taos_close(taos);
pSql->res.code = code;
taos_close(pObj);
return NULL;
} else {
return pObj;
}
}
return taos;
return NULL;
}
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos) {
return taos_connect_imp(ip, user, pass, db, port, fp, param, taos);
STscObj* pObj = taos_connect_imp(ip, user, pass, db, port, fp, param, taos);
if (pObj == NULL) {
return NULL;
}
SSqlObj* pSql = pObj->pSql;
pSql->res.code = tscProcessSql(pSql);
tscTrace("%p DB async connection is opening", pObj);
return pObj;
}
void taos_close(TAOS *taos) {
......@@ -408,14 +435,14 @@ static char *getArithemicInputSrc(void *param, char *name, int32_t colId) {
SSqlFunctionExpr * pExpr = pSupport->pExpr;
int32_t index = -1;
for (int32_t i = 0; i < pExpr->pBinExprInfo.numOfCols; ++i) {
if (strcmp(name, pExpr->pBinExprInfo.pReqColumns[i].name) == 0) {
for (int32_t i = 0; i < pExpr->binExprInfo.numOfCols; ++i) {
if (strcmp(name, pExpr->binExprInfo.pReqColumns[i].name) == 0) {
index = i;
break;
}
}
assert(index >= 0 && index < pExpr->pBinExprInfo.numOfCols);
assert(index >= 0 && index < pExpr->binExprInfo.numOfCols);
return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index];
}
......@@ -465,21 +492,21 @@ static void **doSetResultRowData(SSqlObj *pSql) {
sas->offset = 0;
sas->pExpr = pQueryInfo->fieldsInfo.pExpr[i];
sas->numOfCols = sas->pExpr->pBinExprInfo.numOfCols;
sas->numOfCols = sas->pExpr->binExprInfo.numOfCols;
if (pRes->buffer[i] == NULL) {
pRes->buffer[i] = malloc(tscFieldInfoGetField(pQueryInfo, i)->bytes);
}
for(int32_t k = 0; k < sas->numOfCols; ++k) {
int32_t columnIndex = sas->pExpr->pBinExprInfo.pReqColumns[k].colIdxInBuf;
int32_t columnIndex = sas->pExpr->binExprInfo.pReqColumns[k].colIdxInBuf;
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
sas->elemSize[k] = pExpr->resBytes;
sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
}
tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSQL_SO_ASC, getArithemicInputSrc);
tSQLBinaryExprCalcTraverse(sas->pExpr->binExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSQL_SO_ASC, getArithemicInputSrc);
pRes->tsrow[i] = pRes->buffer[i];
free(sas); //todo optimization
......
......@@ -159,7 +159,7 @@ void taos_init_imp() {
tscMgmtIpList.ip[1] = inet_addr(tsSecondIp);
}
tscInitMsgs();
tscInitMsgsFp();
slaveIndex = rand();
int queueSize = tsMaxVnodeConnections + tsMaxMeterConnections + tsMaxMgmtConnections + tsMaxMgmtConnections;
......
......@@ -1061,8 +1061,8 @@ void tscClearFieldInfo(SFieldInfo* pFieldInfo) {
for(int32_t i = 0; i < pFieldInfo->numOfOutputCols; ++i) {
if (pFieldInfo->pExpr[i] != NULL) {
tSQLBinaryExprDestroy(&pFieldInfo->pExpr[i]->pBinExprInfo.pBinExpr, NULL);
tfree(pFieldInfo->pExpr[i]->pBinExprInfo.pReqColumns);
tSQLBinaryExprDestroy(&pFieldInfo->pExpr[i]->binExprInfo.pBinExpr, NULL);
tfree(pFieldInfo->pExpr[i]->binExprInfo.pReqColumns);
tfree(pFieldInfo->pExpr[i]);
}
}
......
#ifndef TDENGINE_NAME_H
#define TDENGINE_NAME_H
#include "os.h"
#include "taosmsg.h"
typedef struct SDataStatis {
int16_t colId;
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
} SDataStatis;
typedef struct SColumnInfoEx {
SColumnInfo info;
void* pData; // the corresponding block data in memory
} SColumnInfoEx;
int32_t extractTableName(const char *tableId, char *name);
char* extractDBName(const char *tableId, char *name);
......
......@@ -396,7 +396,7 @@ typedef struct SSqlBinaryExprInfo {
typedef struct SSqlFunctionExpr {
SSqlFuncExprMsg pBase;
SSqlBinaryExprInfo pBinExprInfo;
SSqlBinaryExprInfo binExprInfo;
int16_t resBytes;
int16_t resType;
int16_t interResBytes;
......
......@@ -4,6 +4,8 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/tsdb/inc)
INCLUDE_DIRECTORIES(inc)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
......
......@@ -29,7 +29,7 @@ typedef struct SIDList {
int32_t* pData;
} SIDList;
typedef struct SQueryDiskbasedResultBuf {
typedef struct SDiskbasedResultBuf {
int32_t numOfRowsPerPage;
int32_t numOfPages;
int64_t totalBufSize;
......@@ -42,7 +42,7 @@ typedef struct SQueryDiskbasedResultBuf {
uint32_t numOfAllocGroupIds; // number of allocated id list
void* idsTable; // id hash table
SIDList* list; // for each id, there is a page id list
} SQueryDiskbasedResultBuf;
} SDiskbasedResultBuf;
/**
* create disk-based result buffer
......@@ -51,7 +51,7 @@ typedef struct SQueryDiskbasedResultBuf {
* @param rowSize
* @return
*/
int32_t createDiskbasedResultBuffer(SQueryDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize);
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize);
/**
*
......@@ -60,14 +60,14 @@ int32_t createDiskbasedResultBuffer(SQueryDiskbasedResultBuf** pResultBuf, int32
* @param pageId
* @return
*/
tFilePage* getNewDataBuf(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId);
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId);
/**
*
* @param pResultBuf
* @return
*/
int32_t getNumOfRowsPerPage(SQueryDiskbasedResultBuf* pResultBuf);
int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf);
/**
*
......@@ -75,7 +75,7 @@ int32_t getNumOfRowsPerPage(SQueryDiskbasedResultBuf* pResultBuf);
* @param groupId
* @return
*/
SIDList getDataBufPagesIdList(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId);
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
/**
* get the specified buffer page by id
......@@ -83,27 +83,27 @@ SIDList getDataBufPagesIdList(SQueryDiskbasedResultBuf* pResultBuf, int32_t grou
* @param id
* @return
*/
tFilePage* getResultBufferPageById(SQueryDiskbasedResultBuf* pResultBuf, int32_t id);
tFilePage* getResultBufferPageById(SDiskbasedResultBuf* pResultBuf, int32_t id);
/**
* get the total buffer size in the format of disk file
* @param pResultBuf
* @return
*/
int32_t getResBufSize(SQueryDiskbasedResultBuf* pResultBuf);
int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf);
/**
* get the number of groups in the result buffer
* @param pResultBuf
* @return
*/
int32_t getNumOfResultBufGroupId(SQueryDiskbasedResultBuf* pResultBuf);
int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf);
/**
* destroy result buffer
* @param pResultBuf
*/
void destroyResultBuf(SQueryDiskbasedResultBuf* pResultBuf);
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf);
/**
*
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QUERYEXECUTOR_H
#define TDENGINE_QUERYEXECUTOR_H
#include "os.h"
#include "hash.h"
#include "qinterpolation.h"
#include "qresultBuf.h"
#include "qsqlparser.h"
#include "qtsbuf.h"
#include "taosdef.h"
#include "tref.h"
#include "tsqlfunction.h"
typedef struct SData {
int32_t num;
char data[];
} SData;
enum {
ST_QUERY_KILLED = 0, // query killed
ST_QUERY_PAUSED = 1, // query paused, due to full of the response buffer
ST_QUERY_COMPLETED = 2, // query completed
};
struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order);
typedef struct SSqlGroupbyExpr {
int16_t tableIndex;
int16_t numOfGroupCols;
SColIndexEx columnInfo[TSDB_MAX_TAGS]; // group by columns information
int16_t orderIndex; // order by column index
int16_t orderType; // order by type: asc/desc
} SSqlGroupbyExpr;
typedef struct SPosInfo {
int16_t pageId;
int16_t rowId;
} SPosInfo;
typedef struct SWindowStatus {
bool closed;
} SWindowStatus;
typedef struct SWindowResult {
uint16_t numOfRows;
SPosInfo pos; // Position of current result in disk-based output buffer
SResultInfo* resultInfo; // For each result column, there is a resultInfo
STimeWindow window; // The time window that current result covers.
SWindowStatus status;
} SWindowResult;
typedef struct SResultRec {
int64_t pointsTotal;
int64_t pointsRead;
} SResultRec;
typedef struct SWindowResInfo {
SWindowResult* pResult; // result list
void* hashList; // hash list for quick access
int16_t type; // data type for hash key
int32_t capacity; // max capacity
int32_t curIndex; // current start active index
int32_t size; // number of result set
int64_t startTime; // start time of the first time window for sliding query
int64_t prevSKey; // previous (not completed) sliding window start key
int64_t threshold; // threshold to pausing query and return closed results.
} SWindowResInfo;
typedef struct SColumnFilterElem {
int16_t bytes; // column length
__filter_func_t fp;
SColumnFilterInfo filterInfo;
} SColumnFilterElem;
typedef struct SSingleColumnFilterInfo {
SColumnInfoEx info;
int32_t numOfFilters;
SColumnFilterElem* pFilters;
void* pData;
} SSingleColumnFilterInfo;
/* intermediate pos during multimeter query involves interval */
typedef struct STableQueryInfo {
int64_t lastKey;
STimeWindow win;
int32_t numOfRes;
int16_t queryRangeSet; // denote if the query range is set, only available for interval query
int64_t tag;
STSCursor cur;
int32_t sid; // for retrieve the page id list
SWindowResInfo windowResInfo;
} STableQueryInfo;
typedef struct STableDataInfo {
int32_t numOfBlocks;
int32_t start; // start block index
int32_t tableIndex;
void* pMeterObj;
int32_t groupIdx; // group id in table list
STableQueryInfo* pTableQInfo;
} STableDataInfo;
typedef struct SQuery {
int16_t numOfCols;
SOrderVal order;
STimeWindow window;
int64_t intervalTime;
int64_t slidingTime; // sliding time for sliding window query
char slidingTimeUnit; // interval data type, used for daytime revise
int8_t precision;
int16_t numOfOutputCols;
int16_t interpoType;
int16_t checkBufferInLoop; // check if the buffer is full during scan each block
SLimitVal limit;
int32_t rowSize;
SSqlGroupbyExpr* pGroupbyExpr;
SSqlFunctionExpr* pSelectExpr;
SColumnInfoEx* colList;
int32_t numOfFilterCols;
int64_t* defaultVal;
TSKEY lastKey;
uint32_t status; // query status
SResultRec rec;
int32_t pos;
int64_t pointsOffset; // the number of points offset to save read data
SData** sdata;
SSingleColumnFilterInfo* pFilterInfo;
} SQuery;
typedef struct SQueryCostSummary {
} SQueryCostSummary;
typedef struct SQueryRuntimeEnv {
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery;
void* pTabObj;
SData** pInterpoBuf;
SQLFunctionCtx* pCtx;
int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS];
uint16_t scanFlag; // denotes reversed scan of data or not
SInterpolationInfo interpoInfo;
SWindowResInfo windowResInfo;
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostSummary summary;
bool stableQuery; // super table query or not
void* pQueryHandle;
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
} SQueryRuntimeEnv;
typedef struct SQInfo {
uint64_t signature;
TSKEY startTime;
int64_t elapsedTime;
SResultRec rec;
int pointsReturned;
int pointsInterpo;
int code; // error code to returned to client
sem_t dataReady;
SHashObj* pTableList; // table list
SQueryRuntimeEnv runtimeEnv;
int32_t subgroupIdx;
int32_t offset; /* offset in group result set of subgroup */
tSidSet* pSidSet;
T_REF_DECLARE()
/*
* the query is executed position on which meter of the whole list.
* when the index reaches the last one of the list, it means the query is completed.
* We later may refactor to remove this attribution by using another flag to denote
* whether a multimeter query is completed or not.
*/
int32_t tableIndex;
int32_t numOfGroupResultPages;
STableDataInfo* pTableDataInfo;
TSKEY* tsList;
} SQInfo;
/**
* create the qinfo object before adding the query task to each tsdb query worker
*
* @param pReadMsg
* @param pQInfo
* @return
*/
int32_t qCreateQueryInfo(void* pReadMsg, SQInfo** pQInfo);
/**
* query on single table
* @param pReadMsg
*/
void qTableQuery(void* pReadMsg);
/**
* query on super table
* @param pReadMsg
*/
void qSuperTableQuery(void* pReadMsg);
#endif // TDENGINE_QUERYEXECUTOR_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H
void clearTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* pOneOutputRes);
void copyTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* dst, const SWindowResult* src);
int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type);
void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo, int32_t numOfCols);
void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo);
void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num);
void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv);
int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo);
void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order);
SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot);
int32_t curTimeWindow(SWindowResInfo *pWindowResInfo);
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo);
#endif // TDENGINE_QUERYUTIL_H
......@@ -20,11 +20,11 @@
extern "C" {
#endif
#include <stdbool.h>
#include <stdint.h>
#include "os.h"
#include "trpc.h"
#include "../../common/inc/name.h"
#include "taosdef.h"
#include "trpc.h"
#include "tvariant.h"
#define TSDB_FUNC_INVALID_ID -1
......@@ -130,12 +130,8 @@ typedef struct SArithmeticSupport {
typedef struct SQLPreAggVal {
bool isSet;
int32_t numOfNull;
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
int32_t size;
SDataStatis statis;
} SQLPreAggVal;
typedef struct SInterpInfoDetail {
......
......@@ -20,7 +20,6 @@
#include "taosdef.h"
#include "taosmsg.h"
#include "tlog.h"
//#include "tschemautil.h"
#include "tsqlfunction.h"
#include "tstoken.h"
#include "ttokendef.h"
......
......@@ -7,8 +7,8 @@
#define DEFAULT_INTERN_BUF_SIZE 16384L
int32_t createDiskbasedResultBuffer(SQueryDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize) {
SQueryDiskbasedResultBuf* pResBuf = calloc(1, sizeof(SQueryDiskbasedResultBuf));
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize) {
SDiskbasedResultBuf* pResBuf = calloc(1, sizeof(SDiskbasedResultBuf));
pResBuf->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / rowSize;
pResBuf->numOfPages = size;
......@@ -50,17 +50,17 @@ int32_t createDiskbasedResultBuffer(SQueryDiskbasedResultBuf** pResultBuf, int32
return TSDB_CODE_SUCCESS;
}
tFilePage* getResultBufferPageById(SQueryDiskbasedResultBuf* pResultBuf, int32_t id) {
tFilePage* getResultBufferPageById(SDiskbasedResultBuf* pResultBuf, int32_t id) {
assert(id < pResultBuf->numOfPages && id >= 0);
return (tFilePage*)(pResultBuf->pBuf + DEFAULT_INTERN_BUF_SIZE * id);
}
int32_t getNumOfResultBufGroupId(SQueryDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); }
int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); }
int32_t getResBufSize(SQueryDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
static int32_t extendDiskFileSize(SQueryDiskbasedResultBuf* pResultBuf, int32_t numOfPages) {
static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOfPages) {
assert(pResultBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE == pResultBuf->totalBufSize);
int32_t ret = munmap(pResultBuf->pBuf, pResultBuf->totalBufSize);
......@@ -88,11 +88,11 @@ static int32_t extendDiskFileSize(SQueryDiskbasedResultBuf* pResultBuf, int32_t
return TSDB_CODE_SUCCESS;
}
static bool noMoreAvailablePages(SQueryDiskbasedResultBuf* pResultBuf) {
static bool noMoreAvailablePages(SDiskbasedResultBuf* pResultBuf) {
return (pResultBuf->allocateId == pResultBuf->numOfPages - 1);
}
static int32_t getGroupIndex(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId) {
static int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
assert(pResultBuf != NULL);
char* p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t));
......@@ -106,7 +106,7 @@ static int32_t getGroupIndex(SQueryDiskbasedResultBuf* pResultBuf, int32_t group
return slot;
}
static int32_t addNewGroupId(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId) {
static int32_t addNewGroupId(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
int32_t num = getNumOfResultBufGroupId(pResultBuf); // the num is the newest allocated group id slot
if (pResultBuf->numOfAllocGroupIds <= num) {
......@@ -148,7 +148,7 @@ static int32_t doRegisterId(SIDList* pList, int32_t id) {
return 0;
}
static void registerPageId(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) {
static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) {
int32_t slot = getGroupIndex(pResultBuf, groupId);
if (slot < 0) {
slot = addNewGroupId(pResultBuf, groupId);
......@@ -158,7 +158,7 @@ static void registerPageId(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId
doRegisterId(pList, pageId);
}
tFilePage* getNewDataBuf(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
if (noMoreAvailablePages(pResultBuf)) {
if (extendDiskFileSize(pResultBuf, pResultBuf->incStep) != TSDB_CODE_SUCCESS) {
return NULL;
......@@ -177,9 +177,9 @@ tFilePage* getNewDataBuf(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId,
return page;
}
int32_t getNumOfRowsPerPage(SQueryDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
SIDList getDataBufPagesIdList(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId) {
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
SIDList list = {0};
int32_t slot = getGroupIndex(pResultBuf, groupId);
if (slot < 0) {
......@@ -189,7 +189,7 @@ SIDList getDataBufPagesIdList(SQueryDiskbasedResultBuf* pResultBuf, int32_t grou
}
}
void destroyResultBuf(SQueryDiskbasedResultBuf* pResultBuf) {
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) {
if (pResultBuf == NULL) {
return;
}
......
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "hash.h"
#include "taosmsg.h"
#include "qextbuffer.h"
#include "ttime.h"
#include "qinterpolation.h"
//#include "tscJoinProcess.h"
#include "ttime.h"
#include "queryExecutor.h"
int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type) {
pWindowResInfo->capacity = size;
pWindowResInfo->threshold = threshold;
pWindowResInfo->type = type;
_hash_fn_t fn = taosGetDefaultHashFunction(type);
pWindowResInfo->hashList = taosHashInit(threshold, fn, false);
pWindowResInfo->curIndex = -1;
pWindowResInfo->size = 0;
// use the pointer arraylist
pWindowResInfo->pResult = calloc(threshold, sizeof(SWindowResult));
for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
SPosInfo posInfo = {-1, -1};
createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &posInfo);
}
return TSDB_CODE_SUCCESS;
}
void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) {
if (pWindowRes == NULL) {
return;
}
for (int32_t i = 0; i < nOutputCols; ++i) {
free(pWindowRes->resultInfo[i].interResultBuf);
}
free(pWindowRes->resultInfo);
}
void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) {
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) {
assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL);
return;
}
for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
destroyTimeWindowRes(pResult, numOfCols);
}
taosHashCleanup(pWindowResInfo->hashList);
tfree(pWindowResInfo->pResult);
}
void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo) {
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) {
return;
}
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *pWindowRes = &pWindowResInfo->pResult[i];
clearTimeWindowResBuf(pRuntimeEnv, pWindowRes);
}
pWindowResInfo->curIndex = -1;
taosHashCleanup(pWindowResInfo->hashList);
pWindowResInfo->size = 0;
_hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type);
pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, false);
pWindowResInfo->startTime = 0;
pWindowResInfo->prevSKey = 0;
}
void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0 || num == 0) {
return;
}
int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo);
assert(num >= 0 && num <= numOfClosed);
for (int32_t i = 0; i < num; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
if (pResult->status.closed) { // remove the window slot from hash table
taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
} else {
break;
}
}
int32_t remain = pWindowResInfo->size - num;
// clear all the closed windows from the window list
for (int32_t k = 0; k < remain; ++k) {
copyTimeWindowResBuf(pRuntimeEnv, &pWindowResInfo->pResult[k], &pWindowResInfo->pResult[num + k]);
}
// move the unclosed window in the front of the window list
for (int32_t k = remain; k < pWindowResInfo->size; ++k) {
SWindowResult *pWindowRes = &pWindowResInfo->pResult[k];
clearTimeWindowResBuf(pRuntimeEnv, pWindowRes);
}
pWindowResInfo->size = remain;
for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
SWindowResult *pResult = &pWindowResInfo->pResult[k];
int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
int32_t v = (*p - num);
assert(v >= 0 && v <= pWindowResInfo->size);
taosHashPut(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v,
sizeof(int32_t));
}
pWindowResInfo->curIndex = -1;
}
void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) {
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) {
return;
}
int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo);
clearFirstNTimeWindow(pRuntimeEnv, numOfClosed);
}
int32_t numOfClosedTimeWindow(SWindowResInfo *pWindowResInfo) {
int32_t i = 0;
while (i < pWindowResInfo->size && pWindowResInfo->pResult[i].status.closed) {
++i;
}
return i;
}
void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size);
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
if (pWindowResInfo->pResult[i].status.closed) {
continue;
}
pWindowResInfo->pResult[i].status.closed = true;
}
}
/*
* remove the results that are not the FIRST time window that spreads beyond the
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time
*/
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order) {
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size);
int32_t i = 0;
while (i < pWindowResInfo->size &&
((pWindowResInfo->pResult[i].window.ekey < lastKey && order == QUERY_ASC_FORWARD_STEP) ||
(pWindowResInfo->pResult[i].window.skey > lastKey && order == QUERY_DESC_FORWARD_STEP))) {
++i;
}
// assert(i < pWindowResInfo->size);
if (i < pWindowResInfo->size) {
pWindowResInfo->size = (i + 1);
}
}
SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) {
assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size);
return &pWindowResInfo->pResult[slot];
}
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) {
return (getWindowResult(pWindowResInfo, slot)->status.closed == true);
}
int32_t curTimeWindow(SWindowResInfo *pWindowResInfo) {
assert(pWindowResInfo->curIndex >= 0 && pWindowResInfo->curIndex < pWindowResInfo->size);
return pWindowResInfo->curIndex;
}
void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) {
getWindowResult(pWindowResInfo, slot)->status.closed = true;
}
void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRes) {
if (pWindowRes == NULL) {
return;
}
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) {
SResultInfo *pResultInfo = &pWindowRes->resultInfo[i];
char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes);
size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].resBytes;
memset(s, 0, size);
resetResultInfo(pResultInfo);
}
pWindowRes->numOfRows = 0;
// pWindowRes->nAlloc = 0;
pWindowRes->pos = (SPosInfo){-1, -1};
pWindowRes->status.closed = false;
pWindowRes->window = (STimeWindow){0, 0};
}
/**
* The source window result pos attribution of the source window result does not assign to the destination,
* since the attribute of "Pos" is bound to each window result when the window result is created in the
* disk-based result buffer.
*/
void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const SWindowResult *src) {
dst->numOfRows = src->numOfRows;
// dst->nAlloc = src->nAlloc;
dst->window = src->window;
dst->status = src->status;
int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutputCols;
for (int32_t i = 0; i < nOutputCols; ++i) {
SResultInfo *pDst = &dst->resultInfo[i];
SResultInfo *pSrc = &src->resultInfo[i];
char *buf = pDst->interResultBuf;
memcpy(pDst, pSrc, sizeof(SResultInfo));
pDst->interResultBuf = buf; // restore the allocated buffer
// copy the result info struct
memcpy(pDst->interResultBuf, pSrc->interResultBuf, pDst->bufLen);
// copy the output buffer data from src to dst, the position info keep unchanged
char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst);
char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SWindowResult *)src);
size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].resBytes;
memcpy(dstBuf, srcBuf, s);
}
}
......@@ -170,7 +170,7 @@ typedef struct SQueryRuntimeEnv {
STSCursor cur;
SQueryCostSummary summary;
bool stableQuery; // is super table query or not
SQueryDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
/*
* Temporarily hold the in-memory cache block info during scan cache blocks
......
......@@ -1532,7 +1532,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
return w;
}
static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SQueryDiskbasedResultBuf *pResultBuf, int32_t sid,
static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t sid,
int32_t numOfRowsPerPage) {
if (pWindowRes->pos.pageId != -1) {
return 0;
......@@ -1574,7 +1574,7 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SQueryDiskbasedR
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t sid,
STimeWindow *win) {
assert(win->skey <= win->ekey);
SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE);
if (pWindowRes == NULL) {
......@@ -2156,7 +2156,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
int32_t GROUPRESULTID = 1;
SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes);
if (pWindowRes == NULL) {
......@@ -5594,7 +5594,7 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf
}
}
// static tFilePage *getMeterDataPage(SQueryDiskbasedResultBuf *pResultBuf, SMeterQueryInfo *pMeterQueryInfo,
// static tFilePage *getMeterDataPage(SDiskbasedResultBuf *pResultBuf, SMeterQueryInfo *pMeterQueryInfo,
// int32_t index) {
// SIDList pList = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid);
// return getResultBufferPageById(pResultBuf, pList.pData[index]);
......@@ -5700,7 +5700,7 @@ void copyResToQueryResultBuf(STableQuerySupportObj *pSupporter, SQuery *pQuery)
}
SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv;
SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int32_t id = getGroupResultId(pSupporter->subgroupIdx - 1);
SIDList list = getDataBufPagesIdList(pResultBuf, pSupporter->offset + id);
......@@ -5883,7 +5883,7 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery
int32_t flushFromResultBuf(STableQuerySupportObj *pSupporter, const SQuery *pQuery,
const SQueryRuntimeEnv *pRuntimeEnv) {
SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int32_t capacity = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize;
// the base value for group result, since the maximum number of table for each vnode will not exceed 100,000.
......
......@@ -23,6 +23,7 @@
#include "taosdef.h"
#include "taosmsg.h"
#include "tarray.h"
#include "name.h"
#ifdef __cplusplus
extern "C" {
......@@ -192,7 +193,7 @@ typedef void* tsdb_query_handle_t; // Use void to hide implementation details
typedef struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc/asc order to iterate the data block
SColumnFilterInfo colFilterInfo;
SColumnInfoEx colList;
} STsdbQueryCond;
typedef struct SBlockInfo {
......@@ -205,10 +206,10 @@ typedef struct SBlockInfo {
} SBlockInfo;
// TODO: move this data struct out of the module
typedef struct SData {
int32_t num;
char * data;
} SData;
//typedef struct SData {
// int32_t num;
// char * data;
//} SData;
typedef struct SDataBlockInfo {
STimeWindow window;
......@@ -269,7 +270,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle);
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
//int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis);
/**
* The query condition with primary timestamp is passed to iterator during its constructor function,
......
......@@ -14,3 +14,5 @@
*/
#include "os.h"
#include "tsdb.h"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册