未验证 提交 9f0f8c49 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #11183 from taosdata/feature/3.0_liaohj

[td-14440] fix bug.
...@@ -187,7 +187,7 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp ...@@ -187,7 +187,7 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr()); dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr());
return -1; return terrno;
} }
int32_t code = 0; int32_t code = 0;
......
...@@ -362,6 +362,7 @@ typedef struct SSourceDataInfo { ...@@ -362,6 +362,7 @@ typedef struct SSourceDataInfo {
int32_t index; int32_t index;
SRetrieveTableRsp *pRsp; SRetrieveTableRsp *pRsp;
uint64_t totalRows; uint64_t totalRows;
int32_t code;
EX_SOURCE_STATUS status; EX_SOURCE_STATUS status;
} SSourceDataInfo; } SSourceDataInfo;
......
...@@ -844,8 +844,7 @@ static int32_t setResultOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowI ...@@ -844,8 +844,7 @@ static int32_t setResultOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, SqlFunctionCtx* pCtx, static void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
int32_t numOfOutput, int32_t* rowCellInfoOffset);
static int32_t setResultOutputBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_t id, STimeWindow* win, bool masterscan, static int32_t setResultOutputBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_t id, STimeWindow* win, bool masterscan,
SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
...@@ -863,7 +862,7 @@ static int32_t setResultOutputBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_ ...@@ -863,7 +862,7 @@ static int32_t setResultOutputBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_
// set time window for current result // set time window for current result
pResultRow->win = (*win); pResultRow->win = (*win);
*pResult = pResultRow; *pResult = pResultRow;
setResultRowOutputBufInitCtx_rv(pAggSup->pResultBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset); setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1046,7 +1045,9 @@ static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin) { ...@@ -1046,7 +1045,9 @@ static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin) {
static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
int32_t numOfTotal, int32_t numOfOutput, int32_t order) { int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
SScalarParam intervalParam = {.numOfRows = 5, .columnData = pTimeWindowData}; //TODO move out of this function SScalarParam intervalParam = {.numOfRows = 5, .columnData = pTimeWindowData}; //TODO move out of this function
updateTimeWindowInfo(pTimeWindowData, pWin); if (pTimeWindowData != NULL) {
updateTimeWindowInfo(pTimeWindowData, pWin);
}
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pWin->skey; pCtx[k].startTs = pWin->skey;
...@@ -1894,9 +1895,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -1894,9 +1895,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
} }
/*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); /*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
int32_t ret = int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -2018,12 +2017,11 @@ static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCo ...@@ -2018,12 +2017,11 @@ static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCo
SqlFunctionCtx* pCtx = binfo->pCtx; SqlFunctionCtx* pCtx = binfo->pCtx;
SResultRow* pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, groupId, (char*)pData, bytes, true, groupId, SResultRow* pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, groupId, (char*)pData, bytes, true, groupId,
pTaskInfo, true, pAggSup); pTaskInfo, false, pAggSup);
assert(pResultRow != NULL); assert(pResultRow != NULL);
setResultRowKey(pResultRow, pData, type); setResultRowKey(pResultRow, pData, type);
setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
setResultRowOutputBufInitCtx_rv(pBuf, pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2178,8 +2176,8 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num ...@@ -2178,8 +2176,8 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num
} }
} }
pCtx->resDataInfo.interBufSize = env.calcMemSize; pCtx->resDataInfo.interBufSize = env.calcMemSize;
} else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN) { } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR) {
} else if (pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR) { pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes; // for simple column, the intermediate buffer needs to hold one element.
} }
pCtx->input.numOfInputCols = pFunct->numOfParams; pCtx->input.numOfInputCols = pFunct->numOfParams;
...@@ -3775,8 +3773,7 @@ void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pRes ...@@ -3775,8 +3773,7 @@ void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pRes
} }
} }
void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, SqlFunctionCtx* pCtx, void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) {
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
...@@ -3789,18 +3786,13 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, S ...@@ -3789,18 +3786,13 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, S
continue; continue;
} }
// if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
// if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
// }
if (!pResInfo->initialized && pCtx[i].functionId != -1) { if (!pResInfo->initialized && pCtx[i].functionId != -1) {
pCtx[i].fpSet.init(&pCtx[i], pResInfo); pCtx[i].fpSet.init(&pCtx[i], pResInfo);
} }
} }
} }
void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo) {
SExecTaskInfo* pTaskInfo) {
// for simple group by query without interval, all the tables belong to one group result. // for simple group by query without interval, all the tables belong to one group result.
int64_t uid = 0; int64_t uid = 0;
int64_t tid = 0; int64_t tid = 0;
...@@ -3819,14 +3811,13 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i ...@@ -3819,14 +3811,13 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i
* all group belong to one result set, and each group result has different group id so set the id to be one * all group belong to one result set, and each group result has different group id so set the id to be one
*/ */
if (pResultRow->pageId == -1) { if (pResultRow->pageId == -1) {
int32_t ret = int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->pResultBuf, tableGroupId, pAggInfo->binfo.pRes->info.rowSize);
addNewWindowResultBuf(pResultRow, pAggInfo->pResultBuf, tableGroupId, pAggInfo->binfo.pRes->info.rowSize);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return; return;
} }
} }
setResultRowOutputBufInitCtx_rv(pAggInfo->pResultBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset); setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
} }
void setExecutionContext(int32_t numOfOutput, int32_t tableGroupId, TSKEY nextKey, SExecTaskInfo* pTaskInfo, void setExecutionContext(int32_t numOfOutput, int32_t tableGroupId, TSKEY nextKey, SExecTaskInfo* pTaskInfo,
...@@ -5013,12 +5004,16 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) ...@@ -5013,12 +5004,16 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*)param; SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*)param;
pSourceDataInfo->pRsp = pMsg->pData; if (code == TSDB_CODE_SUCCESS) {
pSourceDataInfo->pRsp = pMsg->pData;
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
pRsp->numOfRows = htonl(pRsp->numOfRows); pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->useconds = htobe64(pRsp->useconds); pRsp->compLen = htonl(pRsp->compLen);
pRsp->compLen = htonl(pRsp->compLen); pRsp->useconds = htobe64(pRsp->useconds);
} else {
pSourceDataInfo->code = code;
}
pSourceDataInfo->status = EX_SOURCE_DATA_READY; pSourceDataInfo->status = EX_SOURCE_DATA_READY;
tsem_post(&pSourceDataInfo->pEx->ready); tsem_post(&pSourceDataInfo->pEx->ready);
...@@ -5274,7 +5269,6 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -5274,7 +5269,6 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo* pOperator) {
totalSources, endTs - startTs); totalSources, endTs - startTs);
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
} }
...@@ -5318,18 +5312,22 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -5318,18 +5312,22 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
} }
doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
if (pDataInfo->code != TSDB_CODE_SUCCESS) {
qError("%s vgId:%d, taskID:0x%" PRIx64 " error happens, code:%s",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, tstrerror(pDataInfo->code));
pOperator->pTaskInfo->code = pDataInfo->code;
return NULL;
}
SRetrieveTableRsp* pRsp = pDataInfo->pRsp; SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) { if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 qDebug("%s vgId:%d, taskID:0x%" PRIx64 " %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
" try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
pDataInfo->totalRows, pLoadInfo->totalRows); pDataInfo->totalRows, pLoadInfo->totalRows);
......
...@@ -142,68 +142,68 @@ if $row != 10 then ...@@ -142,68 +142,68 @@ if $row != 10 then
return -1 return -1
endi endi
if $data00 != @2022-01-01 00:00:00.000@ then if $data00 != @22-01-01 00:00:00.000@ then
return -1 return -1
endi endi
if $data01 != 0 then if $data01 != 0 then
return -1 return -1
endi endi
if $data90 != @2022-01-01 00:00:00.009@ then if $data90 != @22-01-01 00:00:00.009@ then
return -1 return -1
endi endi
if $data91 != 9 then if $data91 != 9 then
return -1 return -1
endi endi
print ============> filter not supported yet.
sql select sum(c1), c1, avg(c1), min(c1), max(c2) from group_tb0 where c1 < 20 group by c1; #sql select sum(c1), c1, avg(c1), min(c1), max(c2) from group_tb0 where c1 < 20 group by c1;
if $row != 20 then #if $row != 20 then
return -1 # return -1
endi #endi
#
if $data00 != 0 then #if $data00 != 0 then
return -1 # return -1
endi #endi
#
if $data01 != 0 then #if $data01 != 0 then
return -1 # return -1
endi #endi
#
print $data02 #print $data02
if $data02 != 0.000000000 then #if $data02 != 0.000000000 then
return -1 # return -1
endi #endi
#
if $data03 != 0 then #if $data03 != 0 then
return -1 # return -1
endi #endi
#
print $data04 #print $data04
if $data04 != 0.00000 then #if $data04 != 0.00000 then
return -1 # return -1
endi #endi
#
if $data10 != 100 then #if $data10 != 100 then
return -1 # return -1
endi #endi
#
if $data11 != 1 then #if $data11 != 1 then
return -1 # return -1
endi #endi
#
print $data12 #print $data12
if $data12 != 1.000000000 then #if $data12 != 1.000000000 then
return -1 # return -1
endi #endi
#
if $data13 != 1 then #if $data13 != 1 then
return -1 # return -1
endi #endi
#
if $data14 != 1.00000 then #if $data14 != 1.00000 then
print expect 1.00000, actual:$data14 # print expect 1.00000, actual:$data14
return -1 # return -1
endi #endi
sql_error select sum(c1), ts, c1 from group_tb0 where c1<20 group by c1; sql_error select sum(c1), ts, c1 from group_tb0 where c1<20 group by c1;
sql_error select first(ts), ts, c2 from group_tb0 where c1 < 20 group by c1; sql_error select first(ts), ts, c2 from group_tb0 where c1 < 20 group by c1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册