提交 0718859c 编写于 作者: D dapan1121

enh: support passing params between nodes

上级 c85bbef2
......@@ -1842,13 +1842,32 @@ typedef struct {
int32_t tversion;
} SResReadyRsp;
typedef struct SOperatorSpecParam {
int32_t opType;
void* value;
} SOperatorSpecParam;
typedef struct SOperatorBaseParam {
SOperatorParam* pChild;
} SOperatorBaseParam;
typedef struct SOperatorParam {
SArray* pOpParams; //SArray<SOperatorSpecParam>
} SOperatorParam;
typedef struct STableScanOperatorParam {
SOperatorParam* pChild;
SArray* pUidList;
} STableScanOperatorParam;
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
int32_t execId;
void* opParam;
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
int32_t execId;
SOperatorParam* pOpParam;
} SResFetchReq;
int32_t tSerializeSResFetchReq(void* buf, int32_t bufLen, SResFetchReq* pReq);
......
......@@ -132,6 +132,8 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
*/
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd);
bool qIsDynamicExecTask(qTaskInfo_t tinfo);
/**
* Create the exec task object according to task json
* @param readHandle
......
......@@ -5499,6 +5499,78 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) {
taosMemoryFreeClear(pReq->msg);
}
int32_t tSerializeSOperatorParam(SEncoder* pEncoder, SOperatorParam* pOpParam) {
int32_t n = taosArrayGetSize(pOpParam->pOpParams);
if (tEncodeI32(pEncoder, n) < 0) return -1;
for (int32_t i = 0; i < n; ++i) {
SOperatorSpecParam* pSpec = (SOperatorSpecParam*)taosArrayGet(pOpParam->pOpParams, i);
if (tEncodeI32(pEncoder, pSpec->opType) < 0) return -1;
switch (pSpec->opType) {
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
STableScanOperatorParam* pScan = (STableScanOperatorParam*)pSpec->value;
if (pScan->pChild) {
if (tSerializeSOperatorParam(pEncoder, pScan->pChild) < 0) return -1;
} else {
if (tEncodeI32(pEncoder, 0) < 0) return -1;
}
int32_t uidNum = taosArrayGetSize(pScan->pUidList);
if (tEncodeI32(pEncoder, uidNum) < 0) return -1;
for (int32_t m = 0; m < uidNum; ++m) {
int64_t* pUid = taosArrayGet(pScan->pUidList, m);
if (tEncodeI64(pEncoder, *pUid) < 0) return -1;
}
break;
}
default:
return TSDB_CODE_INVALID_PARA;
}
}
return 0;
}
int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam, int32_t specNum) {
pOpParam->pOpParams = taosArrayInit(specNum, sizeof(SOperatorSpecParam))
if (NULL == pOpParam->pOpParams) return -1;
SOperatorSpecParam specParam;
for (int32_t i = 0; i < specNum; ++i) {
if (tDecodeI32(pDecoder, &specParam.opType) < 0) return -1;
switch (specParam.opType) {
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
if (NULL == pScan) return -1;
int32_t childSpecNum = 0;
if (tDecodeI32(pDecoder, &childSpecNum) < 0) return -1;
if (childSpecNum > 0) {
pScan->pChild = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == pScan->pChild) return -1;
if (tDeserializeSOperatorParam(pDecoder, pScan->pChild, childSpecNum) < 0) return -1;
}
int32_t uidNum = 0;
int64_t uid = 0;
if (tDecodeI32(pDecoder, &uidNum) < 0) return -1;
if (uidNum > 0) {
pScan->pUidList = taosArrayInit(uidNum, sizeof(int64_t));
if (NULL == pScan->pUidList) return -1;
for (int32_t m = 0; m < uidNum; ++m) {
if (tDecodeI64(pDecoder, &uid) < 0) return -1;
taosArrayPush(pScan->pUidList, &uid);
}
}
specParam.value = pScan;
break;
}
default:
return TSDB_CODE_INVALID_PARA;
}
taosArrayPush(pOpParam->pOpParams, &specParam);
}
return 0;
}
int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
......@@ -5514,6 +5586,11 @@ int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) {
if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1;
if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->execId) < 0) return -1;
if (pReq->pOpParam) {
if (tSerializeSOperatorParam(&encoder, pReq->pOpParam) < 0) return -1;
} else {
if (tEncodeI32(&encoder, 0) < 0) return -1;
}
tEndEncode(&encoder);
......@@ -5546,6 +5623,14 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq)
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1;
int32_t specNum = 0;
if (tDecodeI32(&decoder, &specNum) < 0) return -1;
if (specNum > 0) {
pReq->pOpParam = taosMemoryMalloc(sizeof(*pReq->pOpParam));
if (NULL == pReq->pOpParam) return -1;
if (tDeserializeSOperatorParam(&decoder, pReq->pOpParam, specNum) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
......
......@@ -146,6 +146,7 @@ typedef struct SSortMergeJoinOperatorParam {
typedef struct SExchangeOperatorParam {
SOperatorParam* pChild;
int32_t vgId;
int32_t srcOpType;
SArray* uidList;
} SExchangeOperatorParam;
......
......@@ -27,19 +27,6 @@ typedef struct SOperatorCostInfo {
struct SOperatorInfo;
typedef struct SOperatorSpecParam {
int32_t opType;
void* value;
} SOperatorSpecParam;
typedef struct SOperatorBaseParam {
SOperatorParam* pChild;
} SOperatorBaseParam;
typedef struct SOperatorParam {
SArray* pOpParams; //SArray<SOperatorSpecParam>
} SOperatorParam;
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result);
......
......@@ -94,6 +94,9 @@ struct SExecTaskInfo {
STaskStopInfo stopInfo;
SRWLatch lock; // secure the access of STableListInfo
SStorageAPI storageAPI;
int8_t dynamicTask;
SOperatorParam* pOpParam;
bool paramSet;
};
void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst);
......
......@@ -49,6 +49,7 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes,
return TSDB_CODE_OUT_OF_MEMORY;
}
pGc->pChild = pChild;
pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
pGc->downstreamIdx = downstreamIdx;
pGc->needCache = needCache;
......
......@@ -40,7 +40,8 @@ typedef struct SSourceDataInfo {
int32_t code;
EX_SOURCE_STATUS status;
const char* taskId;
SArray* pUidList;
SArray* pSrcUidList;
int32_t srcOpType;
} SSourceDataInfo;
static void destroyExchangeOperatorInfo(void* param);
......@@ -416,6 +417,37 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
return code;
}
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pOpParams = taosArrayInit(1, sizeof(SOperatorSpecParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pScan->pChild = NULL;
pScan->pUidList = taosArrayDup(pUidList, NULL);
if (NULL == pScan->pUidList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SOperatorSpecParam specParam;
specParam.opType = srcOpType;
specParam.value = pScan;
taosArrayPush((*ppRes)->pOpParams, &specParam);
return TSDB_CODE_SUCCESS;
}
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
......@@ -445,10 +477,15 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
req.taskId = pSource->taskId;
req.queryId = pTaskInfo->id.queryId;
req.execId = pSource->execId;
if (pDataInfo->pUidList) {
req.opParam = buildTableScanOperatorParam(pDataInfo->pUidList);
if (pDataInfo->pSrcUidList) {
int32_t code = buildTableScanOperatorParam(&req.opParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
if (TSDB_CODE_SUCCESS != code) {
pTaskInfo->code = code;
taosMemoryFree(pWrapper);
return pTaskInfo->code;
}
}
int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
if (msgSize < 0) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -726,7 +763,8 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.taskId = GET_TASKID(pOperator->pTaskInfo);
dataInfo.index = *pIdx;
dataInfo.pUidList = taosArrayDup(pParam->uidList, NULL);
dataInfo.pSrcUidList = taosArrayDup(pParam->uidList, NULL);
dataInfo.srcOpType = pParam->srcOpType;
taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
}
......
......@@ -512,6 +512,16 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
return 0;
}
bool qIsDynamicExecTask(qTaskInfo_t tinfo) {
return ((SExecTaskInfo*)tinfo)->dynamicTask;
}
void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) {
destroyOperatorParam(((SExecTaskInfo*)tinfo)->pOpParam);
((SExecTaskInfo*)tinfo)->pOpParam = pParam;
((SExecTaskInfo*)tinfo)->paramSet = false;
}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
......@@ -602,8 +612,15 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
int64_t st = taosGetTimestampUs();
if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
pTaskInfo->paramSet = true;
pRes = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam);
} else {
pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
}
int32_t blockIndex = 0;
while ((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) {
while (pRes != NULL) {
SSDataBlock* p = NULL;
if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
SSDataBlock* p1 = createOneDataBlock(pRes, true);
......@@ -623,6 +640,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
if (current >= 4096) {
break;
}
pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
}
*hasMore = (pRes != NULL);
......
......@@ -96,7 +96,7 @@ int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
pOperator->pOperatorParam = getOperatorParam(pOperator->operatorType, pParam, 0);
int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : NULL);
int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : pParam);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
......@@ -292,7 +292,6 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
SOperatorInfo* pOperator = NULL;
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
// NOTE: this is an patch to fix the physical plan
// TODO remove it later
if (pTableScanNode->scan.node.pLimit != NULL) {
......@@ -300,21 +299,25 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
}
STableListInfo* pTableListInfo = tableListCreate();
int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
int32_t code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
if (code) {
pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
return NULL;
}
code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
if (code) {
pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
return NULL;
if (pTableScanNode->scan.node.dynamicOp) {
pTaskInfo->dynamicTask = true;
} else {
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
if (code) {
pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
return NULL;
}
}
pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
......@@ -659,4 +662,10 @@ FORCE_INLINE SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOper
return pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]);
}
void destroyOperatorParam(SOperatorParam* pParam) {
if (NULL == pParam) {
return;
}
}
......@@ -784,11 +784,23 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
return NULL;
}
static int32_t createTableListInfoFromParam(STableScanInfo* pInfo, STableScanOperatorParam* pParam) {
}
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
if (pOperator->pOperatorParam) {
int32_t code = createTableListInfoFromParam(pInfo, (STableScanOperatorParam*)pOperator->pOperatorParam);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
}
// scan table one by one sequentially
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
int32_t numOfTables = 0; // tableListGetSize(pTaskInfo->pTableListInfo);
......
......@@ -512,7 +512,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
int64_t rId = 0;
int32_t eId = req.execId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
SQWMsg qwMsg = {.node = node, .msg = req.pOpParam, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
......
......@@ -401,7 +401,28 @@ _return:
QW_RET(code);
}
int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) {
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
QW_SET_QTID(id, qId, tId, eId);
SQWTaskCtx octx;
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == ctx) {
QW_TASK_DLOG_E("task ctx not exist, may be dropped");
QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
}
if (!qIsDynamicExecTask(ctx->taskHandle)) {
return TSDB_CODE_SUCCESS;
}
qwHandleTaskComplete(QW_FPARAMS_DEF, ctx);
return TSDB_CODE_SUCCESS;
}
int32_t qwDropTask(QW_FPARAMS_DEF) {
QW_ERR_RET(qwHandleDynamicTaskEnd(QW_FPARAMS()));
QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS()));
......
......@@ -203,15 +203,24 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
}
if (numOfResBlock == 0 || (hasMore == false)) {
if (numOfResBlock == 0) {
QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
if (!qIsDynamicExecTask(taskHandle)) {
if (numOfResBlock == 0) {
QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
} else {
QW_TASK_DLOG("qExecTask done, useconds:%" PRIu64, useconds);
}
QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
} else {
QW_TASK_DLOG("qExecTask done, useconds:%" PRIu64, useconds);
if (numOfResBlock == 0) {
QW_TASK_DLOG("dyn task qExecTask end with empty res, useconds:%" PRIu64, useconds);
} else {
QW_TASK_DLOG("dyn task qExecTask done, useconds:%" PRIu64, useconds);
}
}
dsEndPut(sinkHandle, useconds);
QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
if (queryStop) {
*queryStop = true;
}
......@@ -729,8 +738,11 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
qwSaveTbVersionInfo(pTaskInfo, ctx);
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
if (!qIsDynamicExecTask(pTaskInfo)) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
}
_return:
taosMemoryFree(sql);
......@@ -849,6 +861,10 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
ctx->fetchMsgType = qwMsg->msgType;
ctx->dataConnInfo = qwMsg->connInfo;
if (qwMsg->msg) {
qUpdateOperatorParam(ctx->taskHandle);
}
SOutputData sOutput = {0};
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册