未验证 提交 34f5f3fb 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #11194 from taosdata/feature/3.0_liaohj

[td-14428] return correct length for both nchar and varchar type.
......@@ -189,7 +189,7 @@ DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
......
......@@ -136,7 +136,7 @@ typedef struct STscObj {
uint32_t connId;
int32_t connType;
uint64_t id; // ref ID returned by taosAddRef
TdThreadMutex mutex; // used to protect the operation on db
TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo* pAppInfo;
} STscObj;
......@@ -152,7 +152,8 @@ typedef struct SResultColumn {
typedef struct SReqResultInfo {
const char* pRspMsg;
const char* pData;
TAOS_FIELD* fields;
TAOS_FIELD* fields; // todo, column names are not needed.
TAOS_FIELD* userFields; // the fields info that return to user
uint32_t numOfCols;
int32_t* length;
char** convertBuf;
......@@ -221,6 +222,7 @@ void destroyRequest(SRequestObj* pRequest);
char* getDbOfConnection(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db);
void resetConnectDB(STscObj* pTscObj);
void taos_init_imp(void);
int taos_options_imp(TSDB_OPTION option, const char* str);
......
......@@ -169,6 +169,7 @@ static void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
taosMemoryFreeClear(pResInfo->row);
taosMemoryFreeClear(pResInfo->pCol);
taosMemoryFreeClear(pResInfo->fields);
taosMemoryFreeClear(pResInfo->userFields);
if (pResInfo->convertBuf != NULL) {
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
......
......@@ -56,7 +56,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
}
char localDb[TSDB_DB_NAME_LEN] = {0};
if (db != NULL) {
if (db != NULL && strlen(db) > 0) {
if (!validateDbName(db)) {
terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
return NULL;
......@@ -164,6 +164,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
if ((*pQuery)->haveResultSet) {
setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
}
TSWAP(pRequest->dbList, (*pQuery)->pDbList, SArray*);
TSWAP(pRequest->tableList, (*pQuery)->pTableList, SArray*);
}
......@@ -228,12 +229,24 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
assert(pSchema != NULL && numOfCols > 0);
pResInfo->numOfCols = numOfCols;
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(pSchema[0]));
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
pResInfo->fields[i].bytes = pSchema[i].bytes;
pResInfo->fields[i].type = pSchema[i].type;
pResInfo->fields[i].type = pSchema[i].type;
pResInfo->userFields[i].bytes = pSchema[i].bytes;
pResInfo->userFields[i].type = pSchema[i].type;
if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
} else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
}
tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
}
}
......@@ -791,6 +804,16 @@ void setConnectionDB(STscObj* pTscObj, const char* db) {
taosThreadMutexUnlock(&pTscObj->mutex);
}
void resetConnectDB(STscObj* pTscObj) {
if (pTscObj == NULL) {
return;
}
taosThreadMutexLock(&pTscObj->mutex);
pTscObj->db[0] = 0;
taosThreadMutexUnlock(&pTscObj->mutex);
}
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
assert(pResultInfo != NULL && pRsp != NULL);
......
......@@ -146,7 +146,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
}
SReqResultInfo *pResInfo = &(((SRequestObj *)res)->body.resInfo);
return pResInfo->fields;
return pResInfo->userFields;
}
TAOS_RES *taos_query(TAOS *taos, const char *sql) {
......@@ -365,8 +365,7 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
}
bool taos_is_update_query(TAOS_RES *res) {
// TODO
return true;
return taos_num_fields(res) == 0;
}
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
......@@ -393,8 +392,11 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
void taos_reset_current_db(TAOS *taos) {
// TODO
return;
if (taos == NULL) {
return;
}
resetConnectDB(taos);
}
const char *taos_get_server_info(TAOS *taos) {
......
......@@ -393,6 +393,7 @@ typedef struct STableScanInfo {
int32_t times; // repeat counts
int32_t current;
int32_t reverseTimes; // 0 by default
SNode* pFilterNode; // filter operator info
SqlFunctionCtx* pCtx; // next operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowCellInfoOffset;
......@@ -501,12 +502,6 @@ typedef struct SProjectOperatorInfo {
int64_t curOutput;
} SProjectOperatorInfo;
typedef struct SLimitOperatorInfo {
SLimit limit;
int64_t currentOffset;
int64_t currentRows;
} SLimitOperatorInfo;
typedef struct SSLimitOperatorInfo {
int64_t groupTotal;
int64_t currentGroupOffset;
......@@ -525,11 +520,6 @@ typedef struct SSLimitOperatorInfo {
int64_t threshold;
} SSLimitOperatorInfo;
typedef struct SFilterOperatorInfo {
SSingleColumnFilterInfo* pFilterInfo;
int32_t numOfFilterCols;
} SFilterOperatorInfo;
typedef struct SFillOperatorInfo {
struct SFillInfo* pFillInfo;
SSDataBlock* pRes;
......@@ -639,7 +629,7 @@ typedef struct SDistinctOperatorInfo {
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
int32_t reverseTime, SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo);
int32_t reverseTime, SArray* pColMatchInfo, SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
......@@ -682,22 +672,12 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput);
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters);
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win);
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables);
......
......@@ -194,9 +194,6 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o
}
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
static void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx,
int32_t numOfCols, int32_t* rowCellInfoOffset);
void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset);
static bool functionNeedToExecute(SqlFunctionCtx* pCtx);
......@@ -214,8 +211,6 @@ static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
// static STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win);
static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo);
static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream);
static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr);
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
......@@ -264,10 +259,6 @@ static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision,
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput,
SExprInfo* pExpr);
static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput,
SExprInfo* pExpr, char* val, int16_t bytes);
static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId,
SExecTaskInfo* pTaskInfo);
......@@ -3025,6 +3016,23 @@ int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo,
taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p);
}
if (pTableScanInfo->pFilterNode != NULL) {
SFilterInfo* filter = NULL;
int32_t code = filterInitFromNode((SNode*)pTableScanInfo->pFilterNode, &filter, 0);
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1);
int8_t* rowRes = NULL;
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
// filterSetColFieldData(pQueryAttr->pFilters, pBlock->info.numOfCols, pBlock->pDataBlock);
// if (pQueryAttr->pFilters != NULL) {
// filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
// }
}
return TSDB_CODE_SUCCESS;
}
......@@ -3344,11 +3352,6 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void* pTable, SqlFunctionCtx* pCt
offset += pLocalExprInfo->base.resSchema.bytes;
}
// todo : use index to avoid iterator all possible output columns
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddev(pRuntimeEnv, pCtx, numOfOutput, pExprInfo);
}
}
// set the tsBuf start position before check each data block
......@@ -3544,19 +3547,6 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput)
}
}
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity) {
SSDataBlock* pDataBlock = pBInfo->pRes;
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
int32_t functionId = pBInfo->pCtx[i].functionId;
if (functionId < 0) {
memset(pBInfo->pCtx[i].pOutput, 0, pColInfo->info.bytes * (*bufCapacity));
}
}
}
void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
for (int32_t j = 0; j < size; ++j) {
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
......@@ -3714,23 +3704,6 @@ STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow
return pTableQueryInfo;
}
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win) {
STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(1, sizeof(STableQueryInfo));
// pTableQueryInfo->win = win;
pTableQueryInfo->lastKey = win.skey;
// set more initial size of interval/groupby query
int32_t initialSize = 16;
int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTableQueryInfo);
return NULL;
}
return pTableQueryInfo;
}
void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
if (pTableQueryInfo == NULL) {
return;
......@@ -3834,30 +3807,6 @@ void setExecutionContext(int32_t numOfOutput, int32_t tableGroupId, TSKEY nextKe
pAggInfo->groupId = tableGroupId;
}
void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfCols,
int32_t* rowCellInfoOffset) {
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
SFilePage* page = getBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
int16_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResult->offset, offset);
offset += pCtx[i].resDataInfo.bytes;
int32_t functionId = pCtx[i].functionId;
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF ||
functionId == FUNCTION_DERIVATIVE) {
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
}
/*
* set the output buffer information and intermediate buffer,
* not all queries require the interResultBuf, such as COUNT
*/
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
}
}
void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
......@@ -4626,73 +4575,6 @@ static int32_t setupQueryHandle(void* tsdb, STaskRuntimeEnv* pRuntimeEnv, int64_
return terrno;
}
int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr, int32_t tbScanner, SArray* pOperator,
void* param) {
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
pQueryAttr->tsdb = tsdb;
if (tsdb != NULL) {
int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
pQueryAttr->interBufSize = getOutputInterResultBufSize(pQueryAttr);
pRuntimeEnv->groupResInfo.totalGroup = (int32_t)(pQueryAttr->stableQuery ? GET_NUM_OF_TABLEGROUP(pRuntimeEnv) : 0);
pRuntimeEnv->enableGroupData = false;
pRuntimeEnv->pQueryAttr = pQueryAttr;
pRuntimeEnv->pTsBuf = pTsBuf;
pRuntimeEnv->cur.vgroupIndex = -1;
setResultBufSize(pQueryAttr, &pRuntimeEnv->resultInfo);
if (sourceOptr != NULL) {
assert(pRuntimeEnv->proot == NULL);
pRuntimeEnv->proot = sourceOptr;
}
if (pTsBuf != NULL) {
int16_t order = (pQueryAttr->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order);
}
int32_t ps = 4096;
getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize);
int32_t TENMB = 1024 * 1024 * 10;
int32_t code = createDiskbasedBuf(&pRuntimeEnv->pResultBuf, ps, TENMB, "", "/tmp");
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// create runtime environment
int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables;
pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo));
pQInfo->summary.queryProfEvents = taosArrayInit(512, sizeof(SQueryProfEvent));
if (pQInfo->summary.queryProfEvents == NULL) {
// qDebug("QInfo:0x%"PRIx64" failed to allocate query prof events array", pQInfo->qId);
}
pQInfo->summary.operatorProfResults =
taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TINYINT), true, HASH_NO_LOCK);
if (pQInfo->summary.operatorProfResults == NULL) {
// qDebug("QInfo:0x%"PRIx64" failed to allocate operator prof results hash", pQInfo->qId);
}
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t)pQueryAttr->tableGroupInfo.numOfTables, pOperator, param);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// setTaskStatus(pOperator->pTaskInfo, QUERY_NOT_COMPLETED);
return TSDB_CODE_SUCCESS;
}
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
#if 0
if (order == TSDB_ORDER_ASC) {
......@@ -4790,7 +4672,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
// break;
// }
//
// pRuntimeEnv->current = *pTableQueryInfo;
// doTableQueryInfoTimeWindowCheck(pTaskInfo, *pTableQueryInfo, pTableScanInfo->order);
// }
......@@ -5548,7 +5429,7 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) {
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo,
SExecTaskInfo* pTaskInfo) {
SNode* pCondition, SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0);
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
......@@ -5567,21 +5448,22 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
taosArrayPush(pInfo->block.pDataBlock, &idata);
}
pInfo->pTsdbReadHandle = pTsdbReadHandle;
pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime;
pInfo->order = order;
pInfo->current = 0;
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColMatchInfo;
pOperator->name = "TableScanOperator";
pInfo->pFilterNode = pCondition;
pInfo->pTsdbReadHandle = pTsdbReadHandle;
pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime;
pInfo->order = order;
pInfo->current = 0;
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColMatchInfo;
pOperator->name = "TableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->getNextFn = doTableScan;
pOperator->pTaskInfo = pTaskInfo;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->getNextFn = doTableScan;
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
}
......@@ -5838,7 +5720,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
SColumnInfoData* pColInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i);
int64_t tmp = 0;
char t[10] = {0};
STR_TO_VARSTR(t, "_");
STR_TO_VARSTR(t, "_"); //TODO
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
colDataAppend(pColInfoData, numOfRows, t, false);
} else {
......@@ -6890,58 +6772,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
}
static SSDataBlock* doLimit(SOperatorInfo* pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SLimitOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pBlock = NULL;
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pDownstream->getNextFn(pDownstream, newgroup);
publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
return NULL;
}
if (pInfo->currentOffset == 0) {
break;
} else if (pInfo->currentOffset >= pBlock->info.rows) {
pInfo->currentOffset -= pBlock->info.rows;
} else { // TODO handle the data movement
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
pBlock->info.rows = remain;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
}
pInfo->currentOffset = 0;
break;
}
}
if (pInfo->currentRows + pBlock->info.rows >= pInfo->limit.limit) {
pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->currentRows);
pInfo->currentRows = pInfo->limit.limit;
doSetOperatorCompleted(pOperator);
} else {
pInfo->currentRows += pBlock->info.rows;
}
return pBlock;
}
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
......@@ -7791,11 +7621,6 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pSortInfo);
}
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
SFilterOperatorInfo* pInfo = (SFilterOperatorInfo*)param;
doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols);
}
static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param;
taosHashCleanup(pInfo->pSet);
......@@ -8810,7 +8635,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
pScanPhyNode->reverse, pColList, pTaskInfo);
pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc);
......@@ -9175,41 +9000,6 @@ _complete:
return code;
}
int32_t cloneExprFilterInfo(SColumnFilterInfo** dst, SColumnFilterInfo* src, int32_t filterNum) {
if (filterNum <= 0) {
return TSDB_CODE_SUCCESS;
}
*dst = taosMemoryCalloc(filterNum, sizeof(*src));
if (*dst == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
memcpy(*dst, src, sizeof(*src) * filterNum);
for (int32_t i = 0; i < filterNum; i++) {
if ((*dst)[i].filterstr && dst[i]->len > 0) {
void* pz = taosMemoryCalloc(1, (size_t)(*dst)[i].len + 1);
if (pz == NULL) {
if (i == 0) {
taosMemoryFree(*dst);
} else {
freeColumnFilterInfo(*dst, i);
}
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
memcpy(pz, (void*)src->pz, (size_t)src->len + 1);
(*dst)[i].pz = (int64_t)pz;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SColumnInfo* pTagCols, SExprInfo* pExprs,
int32_t numOfOutput, int32_t tagLen, bool superTable) {
for (int32_t i = 0; i < numOfOutput; ++i) {
......@@ -9232,113 +9022,6 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol
return TSDB_CODE_SUCCESS;
}
// TODO tag length should be passed from client, refactor
int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters) {
tExprNode* expr = NULL;
TRY(TSDB_MAX_TAG_CONDITIONS) { expr = exprTreeFromBinary(data, len); }
CATCH(code) {
CLEANUP_EXECUTE();
return code;
}
END_TRY
if (expr == NULL) {
// qError("failed to create expr tree");
return TSDB_CODE_QRY_APP_ERROR;
}
// int32_t ret = filterInitFromTree(expr, pFilters, 0);
// tExprTreeDestroy(expr, NULL);
// return ret;
}
// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo**
// pFilterInfo, uint64_t qId) {
// *pFilterInfo = taosMemoryCalloc(1, sizeof(SSingleColumnFilterInfo) * numOfFilterCols);
// if (*pFilterInfo == NULL) {
// return TSDB_CODE_QRY_OUT_OF_MEMORY;
// }
//
// for (int32_t i = 0, j = 0; i < numOfCols; ++i) {
// if (pCols[i].flist.numOfFilters > 0) {
// SSingleColumnFilterInfo* pFilter = &((*pFilterInfo)[j]);
//
// memcpy(&pFilter->info, &pCols[i], sizeof(SColumnInfo));
// pFilter->info = pCols[i];
//
// pFilter->numOfFilters = pCols[i].flist.numOfFilters;
// pFilter->pFilters = taosMemoryCalloc(pFilter->numOfFilters, sizeof(SColumnFilterElem));
// if (pFilter->pFilters == NULL) {
// return TSDB_CODE_QRY_OUT_OF_MEMORY;
// }
//
// for (int32_t f = 0; f < pFilter->numOfFilters; ++f) {
// SColumnFilterElem* pSingleColFilter = &pFilter->pFilters[f];
// pSingleColFilter->filterInfo = pCols[i].flist.filterInfo[f];
//
// int32_t lower = pSingleColFilter->filterInfo.lowerRelOptr;
// int32_t upper = pSingleColFilter->filterInfo.upperRelOptr;
// if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) {
// //qError("QInfo:0x%"PRIx64" invalid filter info", qId);
// return TSDB_CODE_QRY_INVALID_MSG;
// }
//
// pSingleColFilter->fp = getFilterOperator(lower, upper);
// if (pSingleColFilter->fp == NULL) {
// //qError("QInfo:0x%"PRIx64" invalid filter info", qId);
// return TSDB_CODE_QRY_INVALID_MSG;
// }
//
// pSingleColFilter->bytes = pCols[i].bytes;
//
// if (lower == TSDB_RELATION_IN) {
//// buildFilterSetFromBinary(&pSingleColFilter->q, (char *)(pSingleColFilter->filterInfo.pz),
///(int32_t)(pSingleColFilter->filterInfo.len));
// }
// }
//
// j++;
// }
// }
//
// return TSDB_CODE_SUCCESS;
//}
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) {
// for (int32_t i = 0; i < numOfFilterCols; ++i) {
// if (pFilterInfo[i].numOfFilters > 0) {
// if (pFilterInfo[i].pFilters->filterInfo.lowerRelOptr == TSDB_RELATION_IN) {
// taosHashCleanup((SHashObj *)(pFilterInfo[i].pFilters->q));
// }
// taosMemoryFreeClear(pFilterInfo[i].pFilters);
// }
// }
//
// taosMemoryFreeClear(pFilterInfo);
return NULL;
}
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId) {
for (int32_t i = 0; i < pQueryAttr->numOfCols; ++i) {
// if (pQueryAttr->tableCols[i].flist.numOfFilters > 0 && pQueryAttr->tableCols[i].flist.filterInfo != NULL) {
// pQueryAttr->numOfFilterCols++;
// }
}
if (pQueryAttr->numOfFilterCols == 0) {
return TSDB_CODE_SUCCESS;
}
// doCreateFilterInfo(pQueryAttr->tableCols, pQueryAttr->numOfCols, pQueryAttr->numOfFilterCols,
// &pQueryAttr->pFilterInfo, qId);
pQueryAttr->createFilterOperator = true;
return TSDB_CODE_SUCCESS;
}
static void doUpdateExprColumnIndex(STaskAttr* pQueryAttr) {
assert(pQueryAttr->pExpr1 != NULL && pQueryAttr != NULL);
......
......@@ -3668,7 +3668,6 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnData
taosArrayPush(pList, &pSrc);
FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output));
taosArrayDestroy(pList);
// TODO Fix it
// *p = output.orig.data;
......
......@@ -275,7 +275,6 @@ TEST(timerangeTest, greater_and_lower) {
nodesDestroyNode(logicNode);
}
TEST(columnTest, smallint_column_greater_double_value) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5]= {1, 2, 3, 4, 5};
......@@ -386,7 +385,6 @@ TEST(columnTest, int_column_greater_smallint_value) {
blockDataDestroy(src);
}
TEST(columnTest, int_column_in_double_list) {
SNode *pLeft = NULL, *pRight = NULL, *listNode = NULL, *opNode = NULL;
int32_t leftv[5] = {1, 2, 3, 4, 5};
......@@ -432,8 +430,6 @@ TEST(columnTest, int_column_in_double_list) {
blockDataDestroy(src);
}
TEST(columnTest, binary_column_in_binary_list) {
SNode *pLeft = NULL, *pRight = NULL, *listNode = NULL, *opNode = NULL;
bool eRes[5] = {true, true, false, false, false};
......@@ -497,7 +493,6 @@ TEST(columnTest, binary_column_in_binary_list) {
blockDataDestroy(src);
}
TEST(columnTest, binary_column_like_binary) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
char rightv[64] = {0};
......@@ -546,7 +541,6 @@ TEST(columnTest, binary_column_like_binary) {
blockDataDestroy(src);
}
TEST(columnTest, binary_column_is_null) {
SNode *pLeft = NULL, *opNode = NULL;
char leftv[5][5]= {0};
......@@ -641,8 +635,6 @@ TEST(columnTest, binary_column_is_not_null) {
blockDataDestroy(src);
}
TEST(opTest, smallint_column_greater_int_column) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5] = {1, -6, -2, 11, 101};
......@@ -680,7 +672,6 @@ TEST(opTest, smallint_column_greater_int_column) {
blockDataDestroy(src);
}
TEST(opTest, smallint_value_add_int_column) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int32_t leftv = 1;
......@@ -719,8 +710,6 @@ TEST(opTest, smallint_value_add_int_column) {
blockDataDestroy(src);
}
TEST(opTest, bigint_column_multi_binary_column) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int64_t leftv[5]= {1, 2, 3, 4, 5};
......@@ -845,8 +834,6 @@ TEST(opTest, smallint_column_or_float_column) {
blockDataDestroy(src);
}
TEST(opTest, smallint_column_or_double_value) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5]= {0, 2, 3, 0, -1};
......@@ -885,7 +872,6 @@ TEST(opTest, smallint_column_or_double_value) {
blockDataDestroy(src);
}
TEST(opTest, binary_column_is_true) {
SNode *pLeft = NULL, *opNode = NULL;
char leftv[5][5]= {0};
......@@ -930,7 +916,6 @@ TEST(opTest, binary_column_is_true) {
blockDataDestroy(src);
}
TEST(filterModelogicTest, diff_columns_and_or_and) {
flttInitLogFile();
......@@ -1071,7 +1056,6 @@ TEST(filterModelogicTest, same_column_and_or_and) {
blockDataDestroy(src);
}
TEST(filterModelogicTest, diff_columns_or_and_or) {
SNode *pLeft1 = NULL, *pRight1 = NULL, *pLeft2 = NULL, *pRight2 = NULL, *opNode1 = NULL, *opNode2 = NULL;
SNode *logicNode1 = NULL, *logicNode2 = NULL;
......@@ -1210,8 +1194,6 @@ TEST(filterModelogicTest, same_column_or_and_or) {
blockDataDestroy(src);
}
TEST(scalarModelogicTest, diff_columns_or_and_or) {
flttInitLogFile();
......@@ -1283,8 +1265,6 @@ TEST(scalarModelogicTest, diff_columns_or_and_or) {
blockDataDestroy(src);
}
int main(int argc, char** argv) {
taosSeedRand(taosGetTimestampSec());
testing::InitGoogleTest(&argc, argv);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册