提交 7ee4403d 编写于 作者: H Haojun Liao

[td-11818] Return result to query worker

上级 17a26c47
......@@ -44,7 +44,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskI
* @param handle
* @return
*/
int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle);
struct SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle);
/**
* Retrieve the produced results information, if current query is not paused or completed,
......
......@@ -3454,6 +3454,7 @@ void filterPrepare(void* expr, void* param) {
}
}
static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
STable* pTable1 = ((STableKeyInfo*) p1)->pTable;
......@@ -3537,8 +3538,6 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
int32_t ret = compareFn(prev, p, pSupp);
assert(ret == 0 || ret == -1);
// assert((*p)->type == TSDB_CHILD_TABLE);
if (ret == 0) {
STableKeyInfo info1 = {.pTable = *p, .lastKey = skey};
taosArrayPush(g, &info1);
......@@ -3554,7 +3553,6 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
taosArrayPush(pGroups, &g);
}
#if 0
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) {
assert(pTableList != NULL);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
......@@ -3587,145 +3585,138 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
sup.pTagSchema = pTagSchema;
sup.pCols = pCols;
taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
createTableGroupImpl(pTableGroup, pTableList, size, skey, &sup, tableGroupComparFn);
// taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
// createTableGroupImpl(pTableGroup, pTableList, size, skey, &sup, tableGroupComparFn);
}
return pTableGroup;
}
static bool tableFilterFp(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*) param;
STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
char* val = NULL;
if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
val = (char*) TABLE_NAME(pTable);
} else {
val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
}
if (pInfo->optr == TSDB_RELATION_ISNULL || pInfo->optr == TSDB_RELATION_NOTNULL) {
if (pInfo->optr == TSDB_RELATION_ISNULL) {
return (val == NULL) || isNull(val, pInfo->sch.type);
} else if (pInfo->optr == TSDB_RELATION_NOTNULL) {
return (val != NULL) && (!isNull(val, pInfo->sch.type));
}
} else if (pInfo->optr == TSDB_RELATION_IN) {
int type = pInfo->sch.type;
if (type == TSDB_DATA_TYPE_BOOL || IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t v;
GET_TYPED_DATA(v, int64_t, pInfo->sch.type, val);
return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
uint64_t v;
GET_TYPED_DATA(v, uint64_t, pInfo->sch.type, val);
return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
}
else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) {
double v;
GET_TYPED_DATA(v, double, pInfo->sch.type, val);
return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
} else if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR){
return NULL != taosHashGet((SHashObj *)pInfo->q, varDataVal(val), varDataLen(val));
}
}
int32_t ret = 0;
if (val == NULL) { //the val is possible to be null, so check it out carefully
ret = -1; // val is missing in table tags value pairs
} else {
ret = pInfo->compare(val, pInfo->q);
}
switch (pInfo->optr) {
case TSDB_RELATION_EQUAL: {
return ret == 0;
}
case TSDB_RELATION_NOT_EQUAL: {
return ret != 0;
}
case TSDB_RELATION_GREATER_EQUAL: {
return ret >= 0;
}
case TSDB_RELATION_GREATER: {
return ret > 0;
}
case TSDB_RELATION_LESS_EQUAL: {
return ret <= 0;
}
case TSDB_RELATION_LESS: {
return ret < 0;
}
case TSDB_RELATION_LIKE: {
return ret == 0;
}
case TSDB_RELATION_MATCH: {
return ret == 0;
}
case TSDB_RELATION_NMATCH: {
return ret == 0;
}
case TSDB_RELATION_IN: {
return ret == 1;
}
default:
assert(false);
}
return true;
}
static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
//static bool tableFilterFp(const void* pNode, void* param) {
// tQueryInfo* pInfo = (tQueryInfo*) param;
//
// STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
//
// char* val = NULL;
// if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
// val = (char*) TABLE_NAME(pTable);
// } else {
// val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
// }
//
// if (pInfo->optr == TSDB_RELATION_ISNULL || pInfo->optr == TSDB_RELATION_NOTNULL) {
// if (pInfo->optr == TSDB_RELATION_ISNULL) {
// return (val == NULL) || isNull(val, pInfo->sch.type);
// } else if (pInfo->optr == TSDB_RELATION_NOTNULL) {
// return (val != NULL) && (!isNull(val, pInfo->sch.type));
// }
// } else if (pInfo->optr == TSDB_RELATION_IN) {
// int type = pInfo->sch.type;
// if (type == TSDB_DATA_TYPE_BOOL || IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_TIMESTAMP) {
// int64_t v;
// GET_TYPED_DATA(v, int64_t, pInfo->sch.type, val);
// return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
// } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
// uint64_t v;
// GET_TYPED_DATA(v, uint64_t, pInfo->sch.type, val);
// return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
// }
// else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) {
// double v;
// GET_TYPED_DATA(v, double, pInfo->sch.type, val);
// return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
// } else if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR){
// return NULL != taosHashGet((SHashObj *)pInfo->q, varDataVal(val), varDataLen(val));
// }
//
// }
//
// int32_t ret = 0;
// if (val == NULL) { //the val is possible to be null, so check it out carefully
// ret = -1; // val is missing in table tags value pairs
// } else {
// ret = pInfo->compare(val, pInfo->q);
// }
//
// switch (pInfo->optr) {
// case TSDB_RELATION_EQUAL: {
// return ret == 0;
// }
// case TSDB_RELATION_NOT_EQUAL: {
// return ret != 0;
// }
// case TSDB_RELATION_GREATER_EQUAL: {
// return ret >= 0;
// }
// case TSDB_RELATION_GREATER: {
// return ret > 0;
// }
// case TSDB_RELATION_LESS_EQUAL: {
// return ret <= 0;
// }
// case TSDB_RELATION_LESS: {
// return ret < 0;
// }
// case TSDB_RELATION_LIKE: {
// return ret == 0;
// }
// case TSDB_RELATION_MATCH: {
// return ret == 0;
// }
// case TSDB_RELATION_NMATCH: {
// return ret == 0;
// }
// case TSDB_RELATION_IN: {
// return ret == 1;
// }
//
// default:
// assert(false);
// }
//
// return true;
//}
static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
// query according to the expression tree
SExprTraverseSupp supp = {
.nodeFilterFn = (__result_filter_fn_t) tableFilterFp,
.setupInfoFn = filterPrepare,
.pExtInfo = pSTable->tagSchema,
};
//static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
getTableListfromSkipList(pExpr, pSTable->pIndex, pRes, &supp);
tExprTreeDestroy(pExpr, destroyHelper);
return TSDB_CODE_SUCCESS;
}
//static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
// // query according to the expression tree
// SExprTraverseSupp supp = {
// .nodeFilterFn = (__result_filter_fn_t) tableFilterFp,
// .setupInfoFn = filterPrepare,
// .pExtInfo = pSTable->tagSchema,
// };
//
// getTableListfromSkipList(pExpr, pSTable->pIndex, pRes, &supp);
// tExprTreeDestroy(pExpr, destroyHelper);
// return TSDB_CODE_SUCCESS;
//}
int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols) {
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) {
tsdbError("%p failed to get stable, uid:%" PRIu64, tsdb, uid);
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId) {
STbCfg* pTbCfg = metaGetTbInfoByUid(tsdb->pMeta, uid);
if (pTbCfg == NULL) {
tsdbError("%p failed to get stable, uid:%"PRIu64", reqId:0x%"PRIx64, tsdb, uid, reqId);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
tsdbUnlockRepoMeta(tsdb);
goto _error;
}
if (pTable->type != TSDB_SUPER_TABLE) {
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId,
pTable->name->data);
terrno = TSDB_CODE_COM_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
tsdbUnlockRepoMeta(tsdb);
if (pTbCfg->type != META_SUPER_TABLE) {
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", reId:0x%"PRIx64, tsdb, uid, reqId);
terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
goto _error;
}
//NOTE: not add ref count for super table
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
STSchema* pTagSchema = tsdbGetTableTagSchema(pTable);
STSchema* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true);
// no tags and tbname condition, all child tables of this stable are involved
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
int32_t ret = getAllTableList(pTable, res);
assert(false);
int32_t ret = 0;//getAllTableList(pTable, res);
if (ret != TSDB_CODE_SUCCESS) {
tsdbUnlockRepoMeta(tsdb);
goto _error;
}
......@@ -3736,60 +3727,60 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
taosArrayDestroy(res);
if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
return ret;
}
int32_t ret = TSDB_CODE_SUCCESS;
tExprNode* expr = NULL;
TRY(TSDB_MAX_TAG_CONDITIONS) {
expr = exprTreeFromTableName(tbnameCond);
if (expr == NULL) {
expr = exprTreeFromBinary(pTagCond, len);
} else {
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL);
tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len);
if (tagExpr != NULL) {
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, tagExpr, NULL);
tExprNode* tbnameExpr = expr;
expr = calloc(1, sizeof(tExprNode));
if (expr == NULL) {
THROW( TSDB_CODE_TDB_OUT_OF_MEMORY );
}
expr->nodeType = TSQL_NODE_EXPR;
expr->_node.optr = (uint8_t)tagNameRelType;
expr->_node.pLeft = tagExpr;
expr->_node.pRight = tbnameExpr;
}
}
CLEANUP_EXECUTE();
} CATCH( code ) {
CLEANUP_EXECUTE();
terrno = code;
tsdbUnlockRepoMeta(tsdb); // unlock tsdb in any cases
goto _error;
// TODO: more error handling
} END_TRY
doQueryTableList(pTable, res, expr);
pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%u, belong to %" PRIzu " groups", tsdb, pTable->tableId,
pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
taosArrayDestroy(res);
if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
return ret;
// tExprNode* expr = NULL;
//
// TRY(TSDB_MAX_TAG_CONDITIONS) {
// expr = exprTreeFromTableName(tbnameCond);
// if (expr == NULL) {
// expr = exprTreeFromBinary(pTagCond, len);
// } else {
// CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL);
// tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len);
// if (tagExpr != NULL) {
// CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, tagExpr, NULL);
// tExprNode* tbnameExpr = expr;
// expr = calloc(1, sizeof(tExprNode));
// if (expr == NULL) {
// THROW( TSDB_CODE_TDB_OUT_OF_MEMORY );
// }
// expr->nodeType = TSQL_NODE_EXPR;
// expr->_node.optr = (uint8_t)tagNameRelType;
// expr->_node.pLeft = tagExpr;
// expr->_node.pRight = tbnameExpr;
// }
// }
// CLEANUP_EXECUTE();
//
// } CATCH( code ) {
// CLEANUP_EXECUTE();
// terrno = code;
// tsdbUnlockRepoMeta(tsdb); // unlock tsdb in any cases
//
// goto _error;
// // TODO: more error handling
// } END_TRY
//
// doQueryTableList(pTable, res, expr);
// pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
// pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
//
// tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%u, belong to %" PRIzu " groups", tsdb, pTable->tableId,
// pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
//
// taosArrayDestroy(res);
//
// if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
// return ret;
_error:
return terrno;
}
#if 0
int32_t tsdbGetOneTableGroup(STsdb* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
......
......@@ -597,7 +597,6 @@ void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void freeParam(STaskParam *param);
int32_t convertQueryMsg(SQueryTableReq *pQueryMsg, STaskParam* param);
int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo,
SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo);
......@@ -638,7 +637,8 @@ size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
void setQueryKilled(SQInfo *pQInfo);
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code);
void publishQueryAbortEvent(SExecTaskInfo * pTaskInfo, int32_t code);
void calculateOperatorProfResults(SQInfo* pQInfo);
void queryCostStatis(SQInfo *pQInfo);
......
......@@ -135,7 +135,7 @@ int waitMoment(SQInfo* pQInfo){
}
#endif
int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
......@@ -144,7 +144,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
(void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
return pTaskInfo->code;
return NULL;
}
if (pTaskInfo->cost.start == 0) {
......@@ -153,7 +153,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
if (isTaskKilled(pTaskInfo)) {
qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo));
return pTaskInfo->code;
return NULL;
}
// STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv;
......@@ -168,8 +168,9 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pTaskInfo, ret);
pTaskInfo->code = ret;
qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
return pTaskInfo->code;
qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
tstrerror(pTaskInfo->code));
return NULL;
}
qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo));
......@@ -181,36 +182,18 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
if (handle) {
*handle = pTaskInfo->dsHandle;
}
while(1) {
st = taosGetTimestampUs();
SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st);
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pRes == NULL) { // no results generated yet, abort
dsEndPut(pTaskInfo->dsHandle, pTaskInfo->cost.elapsedTime);
return pTaskInfo->code;
}
st = taosGetTimestampUs();
SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
bool qcontinue = false;
SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL};
pTaskInfo->code = dsPutDataBlock(pTaskInfo->dsHandle, &inputData, &qcontinue);
pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st);
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (isTaskKilled(pTaskInfo)) {
qDebug("QInfo:0x%" PRIx64 " task is killed", GET_TASKID(pTaskInfo));
// } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
// qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pTaskInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
// pRuntimeEnv->resultInfo.total);
}
qDebug("QInfo:0x%" PRIx64 " query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d",
GET_TASKID(pTaskInfo), 0, 0L, 0);
if (!qcontinue) {
qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", GET_TASKID(pTaskInfo),
0, 0L, 0);
return pTaskInfo->code;
}
}
atomic_store_64(&pTaskInfo->owner, 0);
return pRes;
}
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) {
......
......@@ -744,7 +744,7 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
queryRsped = true;
DataSinkHandle sinkHandle = NULL;
code = qExecTask(pTaskInfo, &sinkHandle);
SSDataBlock* pRes = qExecTask(pTaskInfo, &sinkHandle);
if (code) {
QW_TASK_ELOG("qExecTask failed, code:%x", code);
QW_ERR_JRET(code);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册