diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 85814305bd9cce41efebec8bb997f207c8d1a847..82202b8820212fddfe401c769b77003fde2982fc 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -692,7 +692,6 @@ TEST(testCase, insert_test) { taos_free_result(pRes); taos_close(pConn); } -#endif TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -752,9 +751,6 @@ TEST(testCase, projection_query_tables) { taos_close(pConn); } - -#if 0 - TEST(testCase, tsbs_perf_test) { TdThread qid[20] = {0}; @@ -764,15 +760,16 @@ TEST(testCase, tsbs_perf_test) { getchar(); } +#endif TEST(testCase, projection_query_stables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); + TAOS_RES* pRes = taos_query(pConn, "use test"); taos_free_result(pRes); - pRes = taos_query(pConn, "select ts from st1"); + pRes = taos_query(pConn, "select * from meters limit 50000000"); if (taos_errno(pRes) != 0) { printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); @@ -785,14 +782,15 @@ TEST(testCase, projection_query_stables) { char str[512] = {0}; while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); } taos_free_result(pRes); taos_close(pConn); } +#if 0 TEST(testCase, agg_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 3d9757c96f0e46f7c4aedd4c78833bba5f1ebdcd..d4248fc420462f63a35048bb0643623ec784afe5 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -138,7 +138,9 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, toDataCacheEntry(pDispatcher, pInput, pBuf); taosWriteQitem(pDispatcher->pDataBlocks, pBuf); - *pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false); + + int32_t status = updateStatus(pDispatcher); + *pContinue = (status == DS_BUF_LOW || status == DS_BUF_EMPTY); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 3e3c73797d6e8ca2991e1537e7455308cb02d0a4..2163c16f170ac6e4521bfdb2b22263ff706508e2 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -75,7 +75,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } while (1) { + qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo)); tsem_wait(&pExchangeInfo->ready); + if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -360,7 +362,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId); if (pExchangeInfo == NULL) { - qWarn("failed to acquire exchange operator, since it may have been released"); + qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo); taosMemoryFree(pMsg->pData); return TSDB_CODE_SUCCESS; } @@ -379,20 +381,23 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); ASSERT(pRsp != NULL); - qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, - pRsp->numOfRows); + qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, + pRsp->numOfRows, pExchangeInfo); } else { taosMemoryFree(pMsg->pData); pSourceDataInfo->code = code; - qDebug("%s fetch rsp received, index:%d, error:%s", pSourceDataInfo->taskId, index, tstrerror(code)); + qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code), pExchangeInfo); } pSourceDataInfo->status = EX_SOURCE_DATA_READY; + code = tsem_post(&pExchangeInfo->ready); + if (code != TSDB_CODE_SUCCESS) { + code = TAOS_SYSTEM_ERROR(code); + qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo); + } - tsem_post(&pExchangeInfo->ready); taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId); - - return TSDB_CODE_SUCCESS; + return code; } int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) { @@ -444,9 +449,9 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas return pTaskInfo->code; } - qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %p, %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, - pSource->execId, sourceIndex, totalSources); + pSource->execId, pExchangeInfo, sourceIndex, totalSources); // send the fetch remote task result reques SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));