未验证 提交 0916126c 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #9874 from taosdata/feature/qnode2

Feature/qnode2
...@@ -70,7 +70,7 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH ...@@ -70,7 +70,7 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
*/ */
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue); int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue);
void dsEndPut(DataSinkHandle handle, int64_t useconds); void dsEndPut(DataSinkHandle handle, uint64_t useconds);
/** /**
* Get the length of the data returned by the next call to dsGetDataBlock. * Get the length of the data returned by the next call to dsGetDataBlock.
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
extern "C" { extern "C" {
#endif #endif
#include "common.h"
typedef void* qTaskInfo_t; typedef void* qTaskInfo_t;
typedef void* DataSinkHandle; typedef void* DataSinkHandle;
struct SSubplan; struct SSubplan;
...@@ -34,7 +36,7 @@ struct SSubplan; ...@@ -34,7 +36,7 @@ struct SSubplan;
* @param qId * @param qId
* @return * @return
*/ */
int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo); int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
/** /**
* The main task execution function, including query on both table and multiple tables, * The main task execution function, including query on both table and multiple tables,
...@@ -44,7 +46,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskI ...@@ -44,7 +46,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskI
* @param handle * @param handle
* @return * @return
*/ */
int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds);
/** /**
* Retrieve the produced results information, if current query is not paused or completed, * Retrieve the produced results information, if current query is not paused or completed,
......
...@@ -3454,6 +3454,7 @@ void filterPrepare(void* expr, void* param) { ...@@ -3454,6 +3454,7 @@ void filterPrepare(void* expr, void* param) {
} }
} }
static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param; STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
STable* pTable1 = ((STableKeyInfo*) p1)->pTable; STable* pTable1 = ((STableKeyInfo*) p1)->pTable;
...@@ -3537,8 +3538,6 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable ...@@ -3537,8 +3538,6 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
int32_t ret = compareFn(prev, p, pSupp); int32_t ret = compareFn(prev, p, pSupp);
assert(ret == 0 || ret == -1); assert(ret == 0 || ret == -1);
// assert((*p)->type == TSDB_CHILD_TABLE);
if (ret == 0) { if (ret == 0) {
STableKeyInfo info1 = {.pTable = *p, .lastKey = skey}; STableKeyInfo info1 = {.pTable = *p, .lastKey = skey};
taosArrayPush(g, &info1); taosArrayPush(g, &info1);
...@@ -3554,7 +3553,6 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable ...@@ -3554,7 +3553,6 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
taosArrayPush(pGroups, &g); taosArrayPush(pGroups, &g);
} }
#if 0
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) { SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) {
assert(pTableList != NULL); assert(pTableList != NULL);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
...@@ -3587,145 +3585,138 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC ...@@ -3587,145 +3585,138 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
sup.pTagSchema = pTagSchema; sup.pTagSchema = pTagSchema;
sup.pCols = pCols; sup.pCols = pCols;
taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn); // taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
createTableGroupImpl(pTableGroup, pTableList, size, skey, &sup, tableGroupComparFn); // createTableGroupImpl(pTableGroup, pTableList, size, skey, &sup, tableGroupComparFn);
} }
return pTableGroup; return pTableGroup;
} }
static bool tableFilterFp(const void* pNode, void* param) { //static bool tableFilterFp(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*) param; // tQueryInfo* pInfo = (tQueryInfo*) param;
//
STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); // STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
//
char* val = NULL; // char* val = NULL;
if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) { // if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
val = (char*) TABLE_NAME(pTable); // val = (char*) TABLE_NAME(pTable);
} else { // } else {
val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId); // val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
} // }
//
if (pInfo->optr == TSDB_RELATION_ISNULL || pInfo->optr == TSDB_RELATION_NOTNULL) { // if (pInfo->optr == TSDB_RELATION_ISNULL || pInfo->optr == TSDB_RELATION_NOTNULL) {
if (pInfo->optr == TSDB_RELATION_ISNULL) { // if (pInfo->optr == TSDB_RELATION_ISNULL) {
return (val == NULL) || isNull(val, pInfo->sch.type); // return (val == NULL) || isNull(val, pInfo->sch.type);
} else if (pInfo->optr == TSDB_RELATION_NOTNULL) { // } else if (pInfo->optr == TSDB_RELATION_NOTNULL) {
return (val != NULL) && (!isNull(val, pInfo->sch.type)); // return (val != NULL) && (!isNull(val, pInfo->sch.type));
} // }
} else if (pInfo->optr == TSDB_RELATION_IN) { // } else if (pInfo->optr == TSDB_RELATION_IN) {
int type = pInfo->sch.type; // int type = pInfo->sch.type;
if (type == TSDB_DATA_TYPE_BOOL || IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_TIMESTAMP) { // if (type == TSDB_DATA_TYPE_BOOL || IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t v; // int64_t v;
GET_TYPED_DATA(v, int64_t, pInfo->sch.type, val); // GET_TYPED_DATA(v, int64_t, pInfo->sch.type, val);
return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v)); // return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { // } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
uint64_t v; // uint64_t v;
GET_TYPED_DATA(v, uint64_t, pInfo->sch.type, val); // GET_TYPED_DATA(v, uint64_t, pInfo->sch.type, val);
return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v)); // return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
} // }
else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) { // else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) {
double v; // double v;
GET_TYPED_DATA(v, double, pInfo->sch.type, val); // GET_TYPED_DATA(v, double, pInfo->sch.type, val);
return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v)); // return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
} else if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR){ // } else if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR){
return NULL != taosHashGet((SHashObj *)pInfo->q, varDataVal(val), varDataLen(val)); // return NULL != taosHashGet((SHashObj *)pInfo->q, varDataVal(val), varDataLen(val));
} // }
//
} // }
//
int32_t ret = 0; // int32_t ret = 0;
if (val == NULL) { //the val is possible to be null, so check it out carefully // if (val == NULL) { //the val is possible to be null, so check it out carefully
ret = -1; // val is missing in table tags value pairs // ret = -1; // val is missing in table tags value pairs
} else { // } else {
ret = pInfo->compare(val, pInfo->q); // ret = pInfo->compare(val, pInfo->q);
} // }
//
switch (pInfo->optr) { // switch (pInfo->optr) {
case TSDB_RELATION_EQUAL: { // case TSDB_RELATION_EQUAL: {
return ret == 0; // return ret == 0;
} // }
case TSDB_RELATION_NOT_EQUAL: { // case TSDB_RELATION_NOT_EQUAL: {
return ret != 0; // return ret != 0;
} // }
case TSDB_RELATION_GREATER_EQUAL: { // case TSDB_RELATION_GREATER_EQUAL: {
return ret >= 0; // return ret >= 0;
} // }
case TSDB_RELATION_GREATER: { // case TSDB_RELATION_GREATER: {
return ret > 0; // return ret > 0;
} // }
case TSDB_RELATION_LESS_EQUAL: { // case TSDB_RELATION_LESS_EQUAL: {
return ret <= 0; // return ret <= 0;
} // }
case TSDB_RELATION_LESS: { // case TSDB_RELATION_LESS: {
return ret < 0; // return ret < 0;
} // }
case TSDB_RELATION_LIKE: { // case TSDB_RELATION_LIKE: {
return ret == 0; // return ret == 0;
} // }
case TSDB_RELATION_MATCH: { // case TSDB_RELATION_MATCH: {
return ret == 0; // return ret == 0;
} // }
case TSDB_RELATION_NMATCH: { // case TSDB_RELATION_NMATCH: {
return ret == 0; // return ret == 0;
} // }
case TSDB_RELATION_IN: { // case TSDB_RELATION_IN: {
return ret == 1; // return ret == 1;
} // }
//
default: // default:
assert(false); // assert(false);
} // }
//
return true; // return true;
} //}
static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) { //static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
// query according to the expression tree
SExprTraverseSupp supp = {
.nodeFilterFn = (__result_filter_fn_t) tableFilterFp,
.setupInfoFn = filterPrepare,
.pExtInfo = pSTable->tagSchema,
};
getTableListfromSkipList(pExpr, pSTable->pIndex, pRes, &supp); //static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
tExprTreeDestroy(pExpr, destroyHelper); // // query according to the expression tree
return TSDB_CODE_SUCCESS; // SExprTraverseSupp supp = {
} // .nodeFilterFn = (__result_filter_fn_t) tableFilterFp,
// .setupInfoFn = filterPrepare,
// .pExtInfo = pSTable->tagSchema,
// };
//
// getTableListfromSkipList(pExpr, pSTable->pIndex, pRes, &supp);
// tExprTreeDestroy(pExpr, destroyHelper);
// return TSDB_CODE_SUCCESS;
//}
int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len, int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo, int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols) { SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId) {
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error; STbCfg* pTbCfg = metaGetTbInfoByUid(tsdb->pMeta, uid);
if (pTbCfg == NULL) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); tsdbError("%p failed to get stable, uid:%"PRIu64", reqId:0x%"PRIx64, tsdb, uid, reqId);
if (pTable == NULL) {
tsdbError("%p failed to get stable, uid:%" PRIu64, tsdb, uid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
tsdbUnlockRepoMeta(tsdb);
goto _error; goto _error;
} }
if (pTable->type != TSDB_SUPER_TABLE) { if (pTbCfg->type != META_SUPER_TABLE) {
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId, tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", reId:0x%"PRIx64, tsdb, uid, reqId);
pTable->name->data); terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
terrno = TSDB_CODE_COM_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
tsdbUnlockRepoMeta(tsdb);
goto _error; goto _error;
} }
//NOTE: not add ref count for super table //NOTE: not add ref count for super table
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
STSchema* pTagSchema = tsdbGetTableTagSchema(pTable); STSchema* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true);
// no tags and tbname condition, all child tables of this stable are involved // no tags and tbname condition, all child tables of this stable are involved
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) { if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
int32_t ret = getAllTableList(pTable, res); assert(false);
int32_t ret = 0;//getAllTableList(pTable, res);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
tsdbUnlockRepoMeta(tsdb);
goto _error; goto _error;
} }
...@@ -3736,60 +3727,60 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch ...@@ -3736,60 +3727,60 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList)); pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
taosArrayDestroy(res); taosArrayDestroy(res);
if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
return ret; return ret;
} }
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
tExprNode* expr = NULL; // tExprNode* expr = NULL;
//
TRY(TSDB_MAX_TAG_CONDITIONS) { // TRY(TSDB_MAX_TAG_CONDITIONS) {
expr = exprTreeFromTableName(tbnameCond); // expr = exprTreeFromTableName(tbnameCond);
if (expr == NULL) { // if (expr == NULL) {
expr = exprTreeFromBinary(pTagCond, len); // expr = exprTreeFromBinary(pTagCond, len);
} else { // } else {
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL); // CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL);
tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len); // tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len);
if (tagExpr != NULL) { // if (tagExpr != NULL) {
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, tagExpr, NULL); // CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, tagExpr, NULL);
tExprNode* tbnameExpr = expr; // tExprNode* tbnameExpr = expr;
expr = calloc(1, sizeof(tExprNode)); // expr = calloc(1, sizeof(tExprNode));
if (expr == NULL) { // if (expr == NULL) {
THROW( TSDB_CODE_TDB_OUT_OF_MEMORY ); // THROW( TSDB_CODE_TDB_OUT_OF_MEMORY );
} // }
expr->nodeType = TSQL_NODE_EXPR; // expr->nodeType = TSQL_NODE_EXPR;
expr->_node.optr = (uint8_t)tagNameRelType; // expr->_node.optr = (uint8_t)tagNameRelType;
expr->_node.pLeft = tagExpr; // expr->_node.pLeft = tagExpr;
expr->_node.pRight = tbnameExpr; // expr->_node.pRight = tbnameExpr;
} // }
} // }
CLEANUP_EXECUTE(); // CLEANUP_EXECUTE();
//
} CATCH( code ) { // } CATCH( code ) {
CLEANUP_EXECUTE(); // CLEANUP_EXECUTE();
terrno = code; // terrno = code;
tsdbUnlockRepoMeta(tsdb); // unlock tsdb in any cases // tsdbUnlockRepoMeta(tsdb); // unlock tsdb in any cases
//
goto _error; // goto _error;
// TODO: more error handling // // TODO: more error handling
} END_TRY // } END_TRY
//
doQueryTableList(pTable, res, expr); // doQueryTableList(pTable, res, expr);
pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res); // pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey); // pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
//
tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%u, belong to %" PRIzu " groups", tsdb, pTable->tableId, // tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%u, belong to %" PRIzu " groups", tsdb, pTable->tableId,
pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList)); // pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
//
taosArrayDestroy(res); // taosArrayDestroy(res);
//
if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error; // if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
return ret; // return ret;
_error: _error:
return terrno; return terrno;
} }
#if 0
int32_t tsdbGetOneTableGroup(STsdb* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) { int32_t tsdbGetOneTableGroup(STsdb* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error; if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
......
...@@ -32,7 +32,7 @@ typedef struct SDataSinkManager { ...@@ -32,7 +32,7 @@ typedef struct SDataSinkManager {
} SDataSinkManager; } SDataSinkManager;
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue); typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, int64_t useconds); typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds);
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd); typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
......
...@@ -597,7 +597,6 @@ void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); ...@@ -597,7 +597,6 @@ void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput); void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void freeParam(STaskParam *param); void freeParam(STaskParam *param);
int32_t convertQueryMsg(SQueryTableReq *pQueryMsg, STaskParam* param);
int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo,
SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo); SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo);
...@@ -638,7 +637,8 @@ size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows); ...@@ -638,7 +637,8 @@ size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
void setQueryKilled(SQInfo *pQInfo); void setQueryKilled(SQInfo *pQInfo);
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType); void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code); void publishQueryAbortEvent(SExecTaskInfo * pTaskInfo, int32_t code);
void calculateOperatorProfResults(SQInfo* pQInfo); void calculateOperatorProfResults(SQInfo* pQInfo);
void queryCostStatis(SQInfo *pQInfo); void queryCostStatis(SQInfo *pQInfo);
......
...@@ -44,7 +44,7 @@ typedef struct SDataDispatchHandle { ...@@ -44,7 +44,7 @@ typedef struct SDataDispatchHandle {
SDataDispatchBuf nextOutput; SDataDispatchBuf nextOutput;
int32_t status; int32_t status;
bool queryEnd; bool queryEnd;
int64_t useconds; uint64_t useconds;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SDataDispatchHandle; } SDataDispatchHandle;
...@@ -158,7 +158,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, ...@@ -158,7 +158,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) { static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
pthread_mutex_lock(&pDispatcher->mutex); pthread_mutex_lock(&pDispatcher->mutex);
pDispatcher->queryEnd = true; pDispatcher->queryEnd = true;
......
...@@ -37,7 +37,7 @@ int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pC ...@@ -37,7 +37,7 @@ int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pC
return pHandleImpl->fPut(pHandleImpl, pInput, pContinue); return pHandleImpl->fPut(pHandleImpl, pInput, pContinue);
} }
void dsEndPut(DataSinkHandle handle, int64_t useconds) { void dsEndPut(DataSinkHandle handle, uint64_t useconds) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fEndPut(pHandleImpl, useconds); return pHandleImpl->fEndPut(pHandleImpl, useconds);
} }
......
...@@ -68,7 +68,7 @@ void freeParam(STaskParam *param) { ...@@ -68,7 +68,7 @@ void freeParam(STaskParam *param) {
tfree(param->prevResult); tfree(param->prevResult);
} }
int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo) { int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
assert(tsdb != NULL && pSubplan != NULL); assert(tsdb != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
...@@ -85,6 +85,8 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ ...@@ -85,6 +85,8 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle); code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle);
*handle = (*pTask)->dsHandle;
_error: _error:
// if failed to add ref for all tables in this query, abort current query // if failed to add ref for all tables in this query, abort current query
return code; return code;
...@@ -135,10 +137,12 @@ int waitMoment(SQInfo* pQInfo){ ...@@ -135,10 +137,12 @@ int waitMoment(SQInfo* pQInfo){
} }
#endif #endif
int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId(); int64_t threadId = taosGetSelfPthreadId();
*pRes = NULL;
int64_t curOwner = 0; int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
...@@ -153,7 +157,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { ...@@ -153,7 +157,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo)); qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo));
return pTaskInfo->code; return TSDB_CODE_SUCCESS;
} }
// STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv; // STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv;
...@@ -168,7 +172,8 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { ...@@ -168,7 +172,8 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pTaskInfo, ret); publishQueryAbortEvent(pTaskInfo, ret);
pTaskInfo->code = ret; pTaskInfo->code = ret;
qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
tstrerror(pTaskInfo->code));
return pTaskInfo->code; return pTaskInfo->code;
} }
...@@ -178,39 +183,21 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { ...@@ -178,39 +183,21 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = 0; int64_t st = 0;
if (handle) {
*handle = pTaskInfo->dsHandle;
}
while(1) {
st = taosGetTimestampUs(); st = taosGetTimestampUs();
SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); *pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st);
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pRes == NULL) { // no results generated yet, abort if (NULL == *pRes) {
dsEndPut(pTaskInfo->dsHandle, pTaskInfo->cost.elapsedTime); *useconds = pTaskInfo->cost.elapsedTime;
return pTaskInfo->code;
} }
bool qcontinue = false; qDebug("QInfo:0x%" PRIx64 " query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d",
SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL}; GET_TASKID(pTaskInfo), 0, 0L, 0);
pTaskInfo->code = dsPutDataBlock(pTaskInfo->dsHandle, &inputData, &qcontinue);
if (isTaskKilled(pTaskInfo)) {
qDebug("QInfo:0x%" PRIx64 " task is killed", GET_TASKID(pTaskInfo));
// } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
// qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pTaskInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
// pRuntimeEnv->resultInfo.total);
}
if (!qcontinue) { atomic_store_64(&pTaskInfo->owner, 0);
qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", GET_TASKID(pTaskInfo),
0, 0L, 0);
return pTaskInfo->code; return pTaskInfo->code;
}
}
} }
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) { int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) {
......
...@@ -217,5 +217,6 @@ TEST(testCase, build_executor_tree_Test) { ...@@ -217,5 +217,6 @@ TEST(testCase, build_executor_tree_Test) {
"}"; "}";
SExecTaskInfo* pTaskInfo = nullptr; SExecTaskInfo* pTaskInfo = nullptr;
int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo); DataSinkHandle sinkHandle = nullptr;
int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo, &sinkHandle);
} }
\ No newline at end of file
...@@ -458,6 +458,37 @@ _return: ...@@ -458,6 +458,37 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHandle) {
int32_t code = 0;
bool qcontinue = true;
SSDataBlock* pRes = NULL;
uint64_t useconds = 0;
while (qcontinue) {
code = qExecTask(taskHandle, &pRes, &useconds);
if (code) {
QW_TASK_ELOG("qExecTask failed, code:%x", code);
QW_ERR_JRET(code);
}
if (NULL == pRes) {
QW_TASK_DLOG("query done, useconds:%"PRIu64, useconds);
dsEndPut(sinkHandle, useconds);
break;
}
SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL};
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
if (code) {
QW_TASK_ELOG("dsPutDataBlock failed, code:%x", code);
QW_ERR_JRET(code);
}
}
_return:
QW_RET(code);
}
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) { int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
...@@ -733,7 +764,9 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t ...@@ -733,7 +764,9 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
} }
qTaskInfo_t pTaskInfo = NULL; qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo); DataSinkHandle sinkHandle = NULL;
code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle);
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x", code); QW_TASK_ELOG("qCreateExecTask failed, code:%x", code);
QW_ERR_JRET(code); QW_ERR_JRET(code);
...@@ -743,12 +776,7 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t ...@@ -743,12 +776,7 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
queryRsped = true; queryRsped = true;
DataSinkHandle sinkHandle = NULL; QW_ERR_JRET(qwExecTask(QW_FPARAMS(), pTaskInfo, sinkHandle));
code = qExecTask(pTaskInfo, &sinkHandle);
if (code) {
QW_TASK_ELOG("qExecTask failed, code:%x", code);
QW_ERR_JRET(code);
}
_return: _return:
...@@ -840,11 +868,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t ...@@ -840,11 +868,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
qTaskInfo_t taskHandle = ctx->taskHandle; qTaskInfo_t taskHandle = ctx->taskHandle;
DataSinkHandle sinkHandle = ctx->sinkHandle; DataSinkHandle sinkHandle = ctx->sinkHandle;
code = qExecTask(taskHandle, &sinkHandle); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), taskHandle, sinkHandle));
if (code) {
QW_TASK_ELOG("qExecTask failed, code:%x", code);
QW_ERR_JRET(code);
}
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY);
......
...@@ -412,6 +412,8 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { ...@@ -412,6 +412,8 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
++addNum;
} }
} }
...@@ -792,6 +794,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -792,6 +794,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
if (rspCode != TSDB_CODE_SUCCESS) { if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
} }
SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg;
if (rsp) {
pJob->resNumOfRows += rsp->affectedRows;
}
#endif #endif
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
...@@ -1355,9 +1362,9 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru ...@@ -1355,9 +1362,9 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru
SSchJob *job = NULL; SSchJob *job = NULL;
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, &job, true)); SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, true));
*pJob = job; job = *pJob;
pRes->code = atomic_load_32(&job->errCode); pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows; pRes->numOfRows = job->resNumOfRows;
......
...@@ -34,10 +34,12 @@ ...@@ -34,10 +34,12 @@
#include "stub.h" #include "stub.h"
#include "addr_any.h" #include "addr_any.h"
namespace { namespace {
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
void schtInitLogFile() { void schtInitLogFile() {
const char *defaultLogFileNamePrefix = "taoslog"; const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10; const int32_t maxLogFileNum = 10;
...@@ -113,9 +115,9 @@ void schtBuildInsertDag(SQueryDag *dag) { ...@@ -113,9 +115,9 @@ void schtBuildInsertDag(SQueryDag *dag) {
dag->queryId = qId; dag->queryId = qId;
dag->numOfSubplans = 2; dag->numOfSubplans = 2;
dag->pSubplans = taosArrayInit(1, POINTER_BYTES); dag->pSubplans = taosArrayInit(1, POINTER_BYTES);
SArray *inserta = taosArrayInit(dag->numOfSubplans, sizeof(SSubplan)); SArray *inserta = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
SSubplan insertPlan[2] = {0}; SSubplan *insertPlan = (SSubplan *)calloc(2, sizeof(SSubplan));
insertPlan[0].id.queryId = qId; insertPlan[0].id.queryId = qId;
insertPlan[0].id.templateId = 0x0000000000000003; insertPlan[0].id.templateId = 0x0000000000000003;
...@@ -131,6 +133,7 @@ void schtBuildInsertDag(SQueryDag *dag) { ...@@ -131,6 +133,7 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan[0].pParents = NULL; insertPlan[0].pParents = NULL;
insertPlan[0].pNode = NULL; insertPlan[0].pNode = NULL;
insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink)); insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
insertPlan[0].msgType = TDMT_VND_SUBMIT;
insertPlan[1].id.queryId = qId; insertPlan[1].id.queryId = qId;
insertPlan[1].id.templateId = 0x0000000000000003; insertPlan[1].id.templateId = 0x0000000000000003;
...@@ -146,10 +149,11 @@ void schtBuildInsertDag(SQueryDag *dag) { ...@@ -146,10 +149,11 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan[1].pParents = NULL; insertPlan[1].pParents = NULL;
insertPlan[1].pNode = NULL; insertPlan[1].pNode = NULL;
insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink)); insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
insertPlan[1].msgType = TDMT_VND_SUBMIT;
taosArrayPush(inserta, &insertPlan);
taosArrayPush(inserta, &insertPlan[0]); insertPlan += 1;
taosArrayPush(inserta, &insertPlan[1]); taosArrayPush(inserta, &insertPlan);
taosArrayPush(dag->pSubplans, &inserta); taosArrayPush(dag->pSubplans, &inserta);
} }
...@@ -210,6 +214,24 @@ void schtSetRpcSendRequest() { ...@@ -210,6 +214,24 @@ void schtSetRpcSendRequest() {
} }
} }
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
return 0;
}
void schtSetAsyncSendMsgToServer() {
static Stub stub;
stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result);
for (const auto& f : result) {
stub.set(f.second, schtAsyncSendMsgToServer);
}
}
}
void *schtSendRsp(void *param) { void *schtSendRsp(void *param) {
SSchJob *job = NULL; SSchJob *job = NULL;
...@@ -230,7 +252,7 @@ void *schtSendRsp(void *param) { ...@@ -230,7 +252,7 @@ void *schtSendRsp(void *param) {
SShellSubmitRsp rsp = {0}; SShellSubmitRsp rsp = {0};
rsp.affectedRows = 10; rsp.affectedRows = 10;
schHandleResponseMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0); schHandleResponseMsg(job, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
} }
...@@ -238,6 +260,23 @@ void *schtSendRsp(void *param) { ...@@ -238,6 +260,23 @@ void *schtSendRsp(void *param) {
return NULL; return NULL;
} }
void *schtCreateFetchRspThread(void *param) {
struct SSchJob* job = (struct SSchJob*)param;
sleep(1);
int32_t code = 0;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp));
rsp->completed = 1;
rsp->numOfRows = 10;
code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(rsp), 0);
assert(code == 0);
}
struct SSchJob *pInsertJob = NULL; struct SSchJob *pInsertJob = NULL;
} }
...@@ -266,6 +305,7 @@ TEST(queryTest, normalCase) { ...@@ -266,6 +305,7 @@ TEST(queryTest, normalCase) {
schtSetPlanToString(); schtSetPlanToString();
schtSetExecNode(); schtSetExecNode();
schtSetAsyncSendMsgToServer();
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob); code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
...@@ -276,7 +316,7 @@ TEST(queryTest, normalCase) { ...@@ -276,7 +316,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0}; SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
...@@ -287,8 +327,8 @@ TEST(queryTest, normalCase) { ...@@ -287,8 +327,8 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0}; SResReadyRsp rsp = {0};
code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
printf("code:%d", code);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
} }
...@@ -298,7 +338,7 @@ TEST(queryTest, normalCase) { ...@@ -298,7 +338,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0}; SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
...@@ -309,22 +349,19 @@ TEST(queryTest, normalCase) { ...@@ -309,22 +349,19 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0}; SResReadyRsp rsp = {0};
code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
} }
SRetrieveTableRsp rsp = {0}; pthread_attr_t thattr;
rsp.completed = 1; pthread_attr_init(&thattr);
rsp.numOfRows = 10;
code = schHandleResponseMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pthread_t thread1;
pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job);
void *data = NULL; void *data = NULL;
code = scheduleFetchRows(job, &data); code = scheduleFetchRows(job, &data);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
...@@ -340,6 +377,8 @@ TEST(queryTest, normalCase) { ...@@ -340,6 +377,8 @@ TEST(queryTest, normalCase) {
scheduleFreeJob(pJob); scheduleFreeJob(pJob);
schtFreeQueryDag(&dag); schtFreeQueryDag(&dag);
schedulerDestroy();
} }
...@@ -369,6 +408,7 @@ TEST(insertTest, normalCase) { ...@@ -369,6 +408,7 @@ TEST(insertTest, normalCase) {
schtBuildInsertDag(&dag); schtBuildInsertDag(&dag);
schtSetPlanToString(); schtSetPlanToString();
schtSetAsyncSendMsgToServer();
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
...@@ -382,6 +422,8 @@ TEST(insertTest, normalCase) { ...@@ -382,6 +422,8 @@ TEST(insertTest, normalCase) {
ASSERT_EQ(res.numOfRows, 20); ASSERT_EQ(res.numOfRows, 20);
scheduleFreeJob(pInsertJob); scheduleFreeJob(pInsertJob);
schedulerDestroy();
} }
TEST(multiThread, forceFree) { TEST(multiThread, forceFree) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册