提交 6c30d170 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 24f8aebb
...@@ -254,7 +254,7 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t ...@@ -254,7 +254,7 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo); void* createTscObj(const char* user, const char* auth, const char* db, int32_t connType, SAppInstInfo* pAppInfo);
void destroyTscObj(void* pObj); void destroyTscObj(void* pObj);
STscObj* acquireTscObj(int64_t rid); STscObj* acquireTscObj(int64_t rid);
int32_t releaseTscObj(int64_t rid); int32_t releaseTscObj(int64_t rid);
......
...@@ -131,7 +131,7 @@ void destroyTscObj(void *pObj) { ...@@ -131,7 +131,7 @@ void destroyTscObj(void *pObj) {
taosMemoryFreeClear(pTscObj); taosMemoryFreeClear(pTscObj);
} }
void *createTscObj(const char *user, const char *auth, const char *db, SAppInstInfo *pAppInfo) { void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) {
STscObj *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj)); STscObj *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
if (NULL == pObj) { if (NULL == pObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -145,6 +145,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI ...@@ -145,6 +145,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI
return NULL; return NULL;
} }
pObj->connType = connType;
pObj->pAppInfo = pAppInfo; pObj->pAppInfo = pAppInfo;
tstrncpy(pObj->user, user, sizeof(pObj->user)); tstrncpy(pObj->user, user, sizeof(pObj->user));
memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#include "tref.h" #include "tref.h"
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static bool stringLengthCheck(const char* str, size_t maxsize) { static bool stringLengthCheck(const char* str, size_t maxsize) {
...@@ -491,7 +491,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe ...@@ -491,7 +491,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
SAppInstInfo* pAppInfo, int connType) { SAppInstInfo* pAppInfo, int connType) {
STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo); STscObj* pTscObj = createTscObj(user, auth, db, connType, pAppInfo);
if (NULL == pTscObj) { if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pTscObj; return pTscObj;
...@@ -504,7 +504,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t ...@@ -504,7 +504,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return NULL; return NULL;
} }
SMsgSendInfo* body = buildConnectMsg(pRequest, connType); SMsgSendInfo* body = buildConnectMsg(pRequest);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
...@@ -527,7 +527,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t ...@@ -527,7 +527,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return pTscObj; return pTscObj;
} }
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) { static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) { if (pMsgSendInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -550,9 +550,10 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) { ...@@ -550,9 +550,10 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) {
} }
taosMemoryFreeClear(db); taosMemoryFreeClear(db);
connectReq.connType = connType; connectReq.connType = pObj->connType;
connectReq.pid = htonl(appInfo.pid); connectReq.pid = htonl(appInfo.pid);
connectReq.startTime = htobe64(appInfo.startTime); connectReq.startTime = htobe64(appInfo.startTime);
tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app)); tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user)); tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd)); tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
......
...@@ -563,7 +563,11 @@ const char *taos_get_server_info(TAOS *taos) { ...@@ -563,7 +563,11 @@ const char *taos_get_server_info(TAOS *taos) {
} }
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
// TODO if (taos == NULL || sql == NULL) {
// todo directly call fp
}
taos_query_l(taos, sql, (int32_t) strlen(sql));
} }
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
......
...@@ -62,11 +62,6 @@ enum { ...@@ -62,11 +62,6 @@ enum {
* 2. when all data within queried time window, it is also denoted as query_completed * 2. when all data within queried time window, it is also denoted as query_completed
*/ */
TASK_COMPLETED = 0x2u, TASK_COMPLETED = 0x2u,
/* when the result is not completed return to client, this status will be
* usually used in case of interval query with interpolation option
*/
TASK_OVER = 0x4u,
}; };
typedef struct SResultRowCell { typedef struct SResultRowCell {
...@@ -288,12 +283,6 @@ typedef struct SOperatorInfo { ...@@ -288,12 +283,6 @@ typedef struct SOperatorInfo {
SOperatorFpSet fpSet; SOperatorFpSet fpSet;
} SOperatorInfo; } SOperatorInfo;
typedef struct {
int32_t numOfTags;
int32_t numOfCols;
SColumnInfo* colList;
} SQueriedTableInfo;
typedef enum { typedef enum {
EX_SOURCE_DATA_NOT_READY = 0x1, EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2, EX_SOURCE_DATA_READY = 0x2,
...@@ -629,8 +618,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf ...@@ -629,8 +618,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf); void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf);
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
...@@ -711,8 +699,6 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pE ...@@ -711,8 +699,6 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pE
#if 0 #if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
#endif #endif
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
...@@ -737,7 +723,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo); ...@@ -737,7 +723,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
int32_t getMaximumIdleDurationSec(); int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
EOPTR_EXEC_MODEL model); EOPTR_EXEC_MODEL model);
......
...@@ -202,7 +202,7 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) { ...@@ -202,7 +202,7 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
return TSDB_CODE_QRY_INVALID_QHANDLE; return TSDB_CODE_QRY_INVALID_QHANDLE;
} }
return isTaskKilled(pTaskInfo) || Q_STATUS_EQUAL(pTaskInfo->status, TASK_OVER); return isTaskKilled(pTaskInfo);
} }
void qDestroyTask(qTaskInfo_t qTaskHandle) { void qDestroyTask(qTaskInfo_t qTaskHandle) {
......
...@@ -107,7 +107,6 @@ static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo); ...@@ -107,7 +107,6 @@ static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols); static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols);
static int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo* pTableQueryInfo);
static void releaseQueryBuf(size_t numOfTables); static void releaseQueryBuf(size_t numOfTables);
static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr); static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr);
...@@ -922,6 +921,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc ...@@ -922,6 +921,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
// todo disable this // todo disable this
// if (pResultRow->key == NULL) { // if (pResultRow->key == NULL) {
// pResultRow->key = taosMemoryMalloc(varDataTLen(pData)); // pResultRow->key = taosMemoryMalloc(varDataTLen(pData));
// varDataCopy(pResultRow->key, pData); // varDataCopy(pResultRow->key, pData);
...@@ -1451,8 +1451,6 @@ static void getIntermediateBufInfo(STaskRuntimeEnv* pRuntimeEnv, int32_t* ps, in ...@@ -1451,8 +1451,6 @@ static void getIntermediateBufInfo(STaskRuntimeEnv* pRuntimeEnv, int32_t* ps, in
} }
} }
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
// static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, // static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis,
// SqlFunctionCtx *pCtx, int32_t numOfRows) { // SqlFunctionCtx *pCtx, int32_t numOfRows) {
// STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; // STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
...@@ -2009,8 +2007,8 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { ...@@ -2009,8 +2007,8 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
} }
// todo merged with the build group result. // todo merged with the build group result.
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { int32_t* rowCellInfoOffset) {
for (int32_t i = 0; i < pResultRowInfo->size; ++i) { for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
SResultRowPosition* pPos = &pResultRowInfo->pPosition[i]; SResultRowPosition* pPos = &pResultRowInfo->pPosition[i];
...@@ -2023,17 +2021,11 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD ...@@ -2023,17 +2021,11 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
// } // }
for (int32_t j = 0; j < numOfOutput; ++j) { for (int32_t j = 0; j < numOfOutput; ++j) {
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset); struct SResultRowEntryInfo* pResInfo = getResultCell(pRow, j, rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo;
if (!isRowEntryInitialized(pResInfo)) { if (!isRowEntryInitialized(pResInfo)) {
continue; continue;
} }
if (pCtx[j].fpSet.process) { // TODO set the dummy function, to avoid the check for null ptr.
// pCtx[j].fpSet.finalize(&pCtx[j]);
}
if (pRow->numOfRows < pResInfo->numOfRes) { if (pRow->numOfRows < pResInfo->numOfRes) {
pRow->numOfRows = pResInfo->numOfRes; pRow->numOfRows = pResInfo->numOfRes;
} }
...@@ -3682,10 +3674,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -3682,10 +3674,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SOptrBasicInfo* pInfo = &pAggInfo->binfo; SOptrBasicInfo* pInfo = &pAggInfo->binfo;
int32_t order = TSDB_ORDER_ASC;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
bool newgroup = true;
while (1) { while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
...@@ -3698,6 +3688,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -3698,6 +3688,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// } // }
int32_t order = getTableScanOrder(pOperator);
// there is an scalar expression that needs to be calculated before apply the group aggregation. // there is an scalar expression that needs to be calculated before apply the group aggregation.
if (pAggInfo->pScalarExprInfo != NULL) { if (pAggInfo->pScalarExprInfo != NULL) {
int32_t code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx, int32_t code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx,
...@@ -3730,8 +3722,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -3730,8 +3722,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
} }
closeAllResultRows(&pAggInfo->binfo.resultRowInfo); closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf, finalizeMultiTupleQueryResult(pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf, &pAggInfo->binfo.resultRowInfo,
&pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset); pAggInfo->binfo.rowCellInfoOffset);
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, false); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, false);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
...@@ -4014,7 +4006,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -4014,7 +4006,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
} }
} }
// todo dynamic set tags // todo set tags
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// if (pTableQueryInfo != NULL) { // if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs); // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs);
...@@ -4028,7 +4021,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -4028,7 +4021,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pTaskInfo->code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pTaskInfo->code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
pProjectInfo->pPseudoColInfo); pProjectInfo->pPseudoColInfo);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) { if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code); longjmp(pTaskInfo->env, pTaskInfo->code);
} }
...@@ -4279,11 +4271,8 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf ...@@ -4279,11 +4271,8 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf
SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i); SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i);
for (int32_t j = 0; j < taosArrayGetSize(pa); ++j) { for (int32_t j = 0; j < taosArrayGetSize(pa); ++j) {
STableKeyInfo* pk = taosArrayGet(pa, j); STableKeyInfo* pk = taosArrayGet(pa, j);
STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++]; STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++];
// pTQueryInfo->uid = pk->uid;
pTQueryInfo->lastKey = pk->lastKey; pTQueryInfo->lastKey = pk->lastKey;
// pTQueryInfo->groupIndex = i;
} }
} }
...@@ -4302,7 +4291,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -4302,7 +4291,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error; goto _error;
} }
int32_t numOfRows = 1; int32_t numOfRows = 10;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, numOfRows); initResultSizeInfo(pOperator, numOfRows);
...@@ -4313,9 +4302,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -4313,9 +4302,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error; goto _error;
} }
pOperator->resultInfo.capacity = 4096;
pOperator->resultInfo.threshold = 4096 * 0.75;
int32_t numOfGroup = 10; // todo replaced with true value int32_t numOfGroup = 10; // todo replaced with true value
pInfo->groupId = INT32_MIN; pInfo->groupId = INT32_MIN;
initResultRowInfo(&pInfo->binfo.resultRowInfo, numOfGroup); initResultRowInfo(&pInfo->binfo.resultRowInfo, numOfGroup);
...@@ -4545,42 +4531,6 @@ _error: ...@@ -4545,42 +4531,6 @@ _error:
return NULL; return NULL;
} }
static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) {
int32_t j = 0;
if (TSDB_COL_IS_TAG(pExpr->pParam[0].pCol->type)) {
if (pExpr->pParam[0].pCol->colId == TSDB_TBNAME_COLUMN_INDEX) {
return TSDB_TBNAME_COLUMN_INDEX;
}
while (j < pTableInfo->numOfTags) {
if (pExpr->pParam[0].pCol->colId == pTagCols[j].colId) {
return j;
}
j += 1;
}
} /*else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) { // user specified column data
return TSDB_UD_COLUMN_INDEX;
} else {
while (j < pTableInfo->numOfCols) {
if (pExpr->colInfo.colId == pTableInfo->colList[j].colId) {
return j;
}
j += 1;
}
}*/
return INT32_MIN; // return a less than TSDB_TBNAME_COLUMN_INDEX value
}
bool validateExprColumnInfo(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) {
int32_t j = getColumnIndexInSource(pTableInfo, pExpr, pTagCols);
return j != INT32_MIN;
}
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision,
const char* name) { const char* name) {
SResSchema s = {0}; SResSchema s = {0};
...@@ -4739,7 +4689,7 @@ static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOu ...@@ -4739,7 +4689,7 @@ static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOu
static SArray* createSortInfo(SNodeList* pNodeList); static SArray* createSortInfo(SNodeList* pNodeList);
static SArray* extractPartitionColInfo(SNodeList* pNodeList); static SArray* extractPartitionColInfo(SNodeList* pNodeList);
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
static void setJoinColumnInfo(SColumnInfo* pInfo, const SColumnNode* pLeftNode); static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) { static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
SInterval interval = { SInterval interval = {
...@@ -5240,28 +5190,6 @@ _complete: ...@@ -5240,28 +5190,6 @@ _complete:
return code; return code;
} }
static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SColumnInfo* pTagCols, SExprInfo* pExprs,
int32_t numOfOutput, int32_t tagLen, bool superTable) {
for (int32_t i = 0; i < numOfOutput; ++i) {
int16_t functId = getExprFunctionId(&pExprs[i]);
if (functId == FUNCTION_TOP || functId == FUNCTION_BOTTOM) {
int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols);
if (j < 0 || j >= pTableInfo->numOfCols) {
return TSDB_CODE_QRY_INVALID_MSG;
} else {
SColumnInfo* pCol = &pTableInfo->colList[j];
// int32_t ret = getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.param[0].i,
// &pExprs[i].base.resSchema.type, &pExprs[i].base.resSchema.bytes,
// &pExprs[i].base.interBytes, tagLen, superTable, NULL);
// assert(ret == TSDB_CODE_SUCCESS);
}
}
}
return TSDB_CODE_SUCCESS;
}
void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) { void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) {
const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512); const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512);
......
...@@ -304,8 +304,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -304,8 +304,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->binfo.resultRowInfo); closeAllResultRows(&pInfo->binfo.resultRowInfo);
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo,
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); pInfo->binfo.rowCellInfoOffset);
// if (!stableQuery) { // finalize include the update of result rows // if (!stableQuery) { // finalize include the update of result rows
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs); // finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs);
// } else { // } else {
......
...@@ -798,8 +798,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -798,8 +798,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
} }
closeAllResultRows(&pInfo->binfo.resultRowInfo); closeAllResultRows(&pInfo->binfo.resultRowInfo);
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo,
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); pInfo->binfo.rowCellInfoOffset);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
...@@ -916,7 +916,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -916,7 +916,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo); closeAllResultRows(&pBInfo->resultRowInfo);
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
pBInfo->rowCellInfoOffset); pBInfo->rowCellInfoOffset);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
...@@ -1293,7 +1293,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -1293,7 +1293,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo); closeAllResultRows(&pBInfo->resultRowInfo);
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
pBInfo->rowCellInfoOffset); pBInfo->rowCellInfoOffset);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册