提交 0d4fb5bb 编写于 作者: D dapan1121

feat: support insert from query

上级 2d6ddba3
...@@ -246,6 +246,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -246,6 +246,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
SSubmitReq* dataBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, int64_t uid, int64_t suid, int32_t vgId);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock); return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
} }
......
...@@ -1510,6 +1510,7 @@ typedef struct SSubQueryMsg { ...@@ -1510,6 +1510,7 @@ typedef struct SSubQueryMsg {
int32_t execId; int32_t execId;
int8_t taskType; int8_t taskType;
int8_t explain; int8_t explain;
int8_t needFetch;
uint32_t sqlLen; // the query sql, uint32_t sqlLen; // the query sql,
uint32_t phyLen; uint32_t phyLen;
char msg[]; char msg[];
......
...@@ -45,6 +45,10 @@ typedef struct SDeleterParam { ...@@ -45,6 +45,10 @@ typedef struct SDeleterParam {
SArray* pUidList; SArray* pUidList;
} SDeleterParam; } SDeleterParam;
typedef struct SInserterParam {
SReadHandle* readHandle;
} SInserterParam;
typedef struct SDataSinkStat { typedef struct SDataSinkStat {
uint64_t cachedSize; uint64_t cachedSize;
} SDataSinkStat; } SDataSinkStat;
......
...@@ -155,7 +155,7 @@ int64_t qGetQueriedTableUid(qTaskInfo_t tinfo); ...@@ -155,7 +155,7 @@ int64_t qGetQueriedTableUid(qTaskInfo_t tinfo);
*/ */
int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList); int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes);
......
...@@ -74,7 +74,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in ...@@ -74,7 +74,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
......
...@@ -2105,3 +2105,76 @@ const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRow ...@@ -2105,3 +2105,76 @@ const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRow
ASSERT(pStart - pData == dataLen); ASSERT(pStart - pData == dataLen);
return pStart; return pStart;
} }
SSubmitReq* dataBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, int64_t uid, int64_t suid, int32_t vgId) {
SSubmitReq* ret = NULL;
int32_t sz = taosArrayGetSize(pBlocks);
// cal size
int32_t cap = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
int32_t rows = pDataBlock->info.rows;
// TODO min
int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
cap += sizeof(SSubmitBlk) + rows * maxLen;
}
// assign data
// TODO
ret = rpcMallocCont(cap);
ret->header.vgId = vgId;
ret->version = htonl(1);
ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz);
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
blkHead->numOfRows = htons(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version);
// TODO
blkHead->suid = htobe64(suid);
blkHead->uid = htobe64(uid);
blkHead->schemaLen = htonl(0);
int32_t rows = pDataBlock->info.rows;
int32_t dataLen = 0;
STSRow* rowData = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
for (int32_t j = 0; j < rows; j++) {
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version);
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
tdSRowResetBuf(&rb, rowData);
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pColumn = &pTSchema->columns[k];
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
if (colDataIsNull_s(pColData, j)) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, pColumn->offset, k);
} else {
void* data = colDataGetData(pColData, j);
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
}
}
int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen);
dataLen += rowLen;
}
blkHead->dataLen = htonl(dataLen);
ret->length += sizeof(SSubmitBlk) + dataLen;
blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen);
}
ret->length = htonl(ret->length);
return ret;
}
...@@ -88,7 +88,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -88,7 +88,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
case TDMT_MND_SYSTABLE_RETRIEVE_RSP: case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
case TDMT_DND_SYSTABLE_RETRIEVE_RSP: case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
case TDMT_SCH_FETCH_RSP: case TDMT_SCH_FETCH_RSP:
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0); case TDMT_VND_SUBMIT_RSP:
qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
return; return;
case TDMT_MND_STATUS_RSP: case TDMT_MND_STATUS_RSP:
if (pEpSet != NULL) { if (pEpSet != NULL) {
......
...@@ -137,6 +137,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDat ...@@ -137,6 +137,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDat
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond, int32_t tWinIdx); void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond, int32_t tWinIdx);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle); void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid);
// tq // tq
......
...@@ -3777,3 +3777,39 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { ...@@ -3777,3 +3777,39 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
taosMemoryFree(pTsdbReadHandle->pSchema); taosMemoryFree(pTsdbReadHandle->pSchema);
taosMemoryFreeClear(pTsdbReadHandle); taosMemoryFreeClear(pTsdbReadHandle);
} }
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t *suid) {
int32_t sversion = 1;
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
int32_t code = metaGetTableEntryByUid(&mr, uid);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
metaReaderClear(&mr);
return terrno;
}
*suid = 0;
if (mr.me.type == TSDB_CHILD_TABLE) {
*suid = mr.me.ctbEntry.suid;
code = metaGetTableEntryByUid(&mr, *suid);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
metaReaderClear(&mr);
return terrno;
}
sversion = mr.me.stbEntry.schemaRow.version;
} else {
ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
sversion = mr.me.ntbEntry.schemaRow.version;
}
metaReaderClear(&mr);
*pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
return TSDB_CODE_SUCCESS;
}
...@@ -855,7 +855,7 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length); ...@@ -855,7 +855,7 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
const char* sql, EOPTR_EXEC_MODEL model); const char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo); int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
int32_t* resNum); int32_t* resNum);
......
...@@ -24,195 +24,153 @@ ...@@ -24,195 +24,153 @@
extern SDataSinkStat gDataSinkStat; extern SDataSinkStat gDataSinkStat;
typedef struct SDataInserterBuf { typedef struct SSubmitRes {
int32_t useSize; int64_t affectedRows;
int32_t allocSize; int32_t code;
char* pData; SSubmitRsp *pRsp;
} SDataInserterBuf; } SSubmitRes;
typedef struct SDataCacheEntry {
int32_t dataLen;
int32_t numOfRows;
int32_t numOfCols;
int8_t compressed;
char data[];
} SDataCacheEntry;
typedef struct SDataInserterHandle { typedef struct SDataInserterHandle {
SDataSinkHandle sink; SDataSinkHandle sink;
SDataSinkManager* pManager; SDataSinkManager* pManager;
SDataBlockDescNode* pSchema; STSchema* pSchema;
SDataDeleterNode* pDeleter; SQueryInserterNode* pNode;
SDeleterParam* pParam; SSubmitRes submitRes;
STaosQueue* pDataBlocks; SInserterParam* pParam;
SDataInserterBuf nextOutput; SArray* pDataBlocks;
int32_t status; int32_t status;
bool queryEnd; bool queryEnd;
uint64_t useconds; uint64_t useconds;
uint64_t cachedSize; uint64_t cachedSize;
TdThreadMutex mutex; TdThreadMutex mutex;
tsem_t ready;
} SDataInserterHandle; } SDataInserterHandle;
static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { typedef struct SSubmitRspParam {
if (tsCompressColData < 0 || 0 == pData->info.rows) { SDataInserterHandle* pInserter;
return false; } SSubmitRspParam;
}
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
int32_t colSize = pColRes->info.bytes * pData->info.rows;
if (NEEDTO_COMPRESS_QUERY(colSize)) {
return true;
}
}
return false; static int32_t updateStatus(SDataInserterHandle* pInserter) {
taosThreadMutexLock(&pInserter->mutex);
int32_t blockNums = taosQueueItemSize(pInserter->pDataBlocks);
int32_t status =
(0 == blockNums ? DS_BUF_EMPTY
: (blockNums < pInserter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
pInserter->status = status;
taosThreadMutexUnlock(&pInserter->mutex);
return status;
} }
static void toDataCacheEntry(SDataInserterHandle* pHandle, const SInputData* pInput, SDataInserterBuf* pBuf) { static int32_t getStatus(SDataInserterHandle* pInserter) {
int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots); taosThreadMutexLock(&pInserter->mutex);
int32_t status = pInserter->status;
taosThreadMutexUnlock(&pInserter->mutex);
return status;
}
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
pEntry->compressed = 0; SSubmitRspParam* pParam = (SSubmitRspParam*)param;
pEntry->numOfRows = pInput->pData->info.rows; SDataInserterHandle* pInserter = pParam->pInserter;
pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock);
pEntry->dataLen = sizeof(SDeleterRes);
ASSERT(1 == pEntry->numOfRows); pInserter->submitRes.code = code;
ASSERT(1 == pEntry->numOfCols);
if (code == TSDB_CODE_SUCCESS) {
pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp));
SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len);
code = tDecodeSSubmitRsp(&coder, pInserter->submitRes.pRsp);
if (code) {
tFreeSSubmitRsp(pInserter->submitRes.pRsp);
pInserter->submitRes.code = code;
goto _return;
}
if (pInserter->submitRes.pRsp->nBlocks > 0) {
for (int32_t i = 0; i < pInserter->submitRes.pRsp->nBlocks; ++i) {
SSubmitBlkRsp *blk = pInserter->submitRes.pRsp->pBlocks + i;
if (TSDB_CODE_SUCCESS != blk->code) {
code = blk->code;
tFreeSSubmitRsp(pInserter->submitRes.pRsp);
pInserter->submitRes.code = code;
goto _return;
}
}
}
pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
qDebug("submit rsp received, affectedRows:%d, total:%d", pInserter->submitRes.pRsp->affectedRows, pInserter->submitRes.affectedRows);
pBuf->useSize = sizeof(SDataCacheEntry); tFreeSSubmitRsp(pInserter->submitRes.pRsp);
}
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0); _return:
SDeleterRes* pRes = (SDeleterRes*)pEntry->data; tsem_post(&pInserter->ready);
pRes->suid = pHandle->pParam->suid;
pRes->uidList = pHandle->pParam->pUidList;
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
pRes->affectedRows = *(int64_t*)pColRes->pData;
pBuf->useSize += pEntry->dataLen; taosMemoryFree(param);
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen); return TSDB_CODE_SUCCESS;
atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
} }
static bool allocBuf(SDataInserterHandle* pDeleter, const SInputData* pInput, SDataInserterBuf* pBuf) {
uint32_t capacity = pDeleter->pManager->cfg.maxDataBlockNumPerQuery;
if (taosQueueItemSize(pDeleter->pDataBlocks) > capacity) {
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
taosQueueItemSize(pDeleter->pDataBlocks));
return false;
}
pBuf->allocSize = sizeof(SDataCacheEntry) + sizeof(SDeleterRes);
pBuf->pData = taosMemoryMalloc(pBuf->allocSize); static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMsg, void* pTransporter, SEpSet* pEpset) {
if (pBuf->pData == NULL) { // send the fetch remote task result reques
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
taosMemoryFreeClear(pMsg);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return terrno;
} }
return NULL != pBuf->pData; SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam));
} pParam->pInserter = pInserter;
static int32_t updateStatus(SDataInserterHandle* pDeleter) { pMsgSendInfo->param = pParam;
taosThreadMutexLock(&pDeleter->mutex); pMsgSendInfo->msgInfo.pData = pMsg;
int32_t blockNums = taosQueueItemSize(pDeleter->pDataBlocks); pMsgSendInfo->msgInfo.len = sizeof(SSubmitReq);
int32_t status = pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
(0 == blockNums ? DS_BUF_EMPTY pMsgSendInfo->fp = inserterCallback;
: (blockNums < pDeleter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
pDeleter->status = status;
taosThreadMutexUnlock(&pDeleter->mutex);
return status;
}
static int32_t getStatus(SDataInserterHandle* pDeleter) { int64_t transporterId = 0;
taosThreadMutexLock(&pDeleter->mutex); return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo);
int32_t status = pDeleter->status;
taosThreadMutexUnlock(&pDeleter->mutex);
return status;
} }
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle;
SDataInserterBuf* pBuf = taosAllocateQitem(sizeof(SDataInserterBuf), DEF_QITEM);
if (NULL == pBuf || !allocBuf(pDeleter, pInput, pBuf)) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
toDataCacheEntry(pDeleter, pInput, pBuf);
taosWriteQitem(pDeleter->pDataBlocks, pBuf);
*pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false);
return TSDB_CODE_SUCCESS;
}
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
taosThreadMutexLock(&pDeleter->mutex); taosArrayPush(pInserter->pDataBlocks, pInput->pData);
pDeleter->queryEnd = true; SSubmitReq* pMsg = dataBlockToSubmit(pInserter->pDataBlocks, pInserter->pSchema, pInserter->pNode->tableId, pInserter->pNode->suid, pInserter->pNode->vgId);
pDeleter->useconds = useconds;
taosThreadMutexUnlock(&pDeleter->mutex);
}
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { int32_t code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet);
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; if (code) {
if (taosQueueEmpty(pDeleter->pDataBlocks)) { return code;
*pQueryEnd = pDeleter->queryEnd;
*pLen = 0;
return;
} }
SDataInserterBuf* pBuf = NULL; tsem_wait(&pInserter->ready);
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataInserterBuf));
taosFreeQitem(pBuf);
*pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen;
*pQueryEnd = pDeleter->queryEnd;
qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
}
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { if (pInserter->submitRes.code) {
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; return pInserter->submitRes.code;
if (NULL == pDeleter->nextOutput.pData) {
assert(pDeleter->queryEnd);
pOutput->useconds = pDeleter->useconds;
pOutput->precision = pDeleter->pSchema->precision;
pOutput->bufStatus = DS_BUF_EMPTY;
pOutput->queryEnd = pDeleter->queryEnd;
return TSDB_CODE_SUCCESS;
} }
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); *pContinue = true;
pOutput->numOfRows = pEntry->numOfRows;
pOutput->numOfCols = pEntry->numOfCols;
pOutput->compressed = pEntry->compressed;
atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
taosMemoryFreeClear(pDeleter->nextOutput.pData); // todo persistent
pOutput->bufStatus = updateStatus(pDeleter);
taosThreadMutexLock(&pDeleter->mutex);
pOutput->queryEnd = pDeleter->queryEnd;
pOutput->useconds = pDeleter->useconds;
pOutput->precision = pDeleter->pSchema->precision;
taosThreadMutexUnlock(&pDeleter->mutex);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
taosThreadMutexLock(&pInserter->mutex);
pInserter->queryEnd = true;
pInserter->useconds = useconds;
taosThreadMutexUnlock(&pInserter->mutex);
}
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize); atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
taosMemoryFreeClear(pDeleter->nextOutput.pData); taosArrayDestroy(pInserter->pDataBlocks);
while (!taosQueueEmpty(pDeleter->pDataBlocks)) { taosMemoryFree(pInserter->pSchema);
SDataInserterBuf* pBuf = NULL; taosThreadMutexDestroy(&pInserter->mutex);
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
taosMemoryFreeClear(pBuf->pData);
taosFreeQitem(pBuf);
}
taosCloseQueue(pDeleter->pDataBlocks);
taosThreadMutexDestroy(&pDeleter->mutex);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -230,25 +188,39 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat ...@@ -230,25 +188,39 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
SDataDeleterNode* pDeleterNode = (SDataDeleterNode *)pDataSink; SDataDeleterNode* pInserterNode = (SQueryInserterNode *)pDataSink;
inserter->sink.fPut = putDataBlock; inserter->sink.fPut = putDataBlock;
inserter->sink.fEndPut = endPut; inserter->sink.fEndPut = endPut;
inserter->sink.fGetLen = getDataLength; inserter->sink.fGetLen = NULL;
inserter->sink.fGetData = getDataBlock; inserter->sink.fGetData = NULL;
inserter->sink.fDestroy = destroyDataSinker; inserter->sink.fDestroy = destroyDataSinker;
inserter->sink.fGetCacheSize = getCacheSize; inserter->sink.fGetCacheSize = getCacheSize;
inserter->pManager = pManager; inserter->pManager = pManager;
inserter->pDeleter = pDeleterNode; inserter->pNode = pInserterNode;
inserter->pSchema = pDataSink->pInputDataBlockDesc;
inserter->pParam = pParam; inserter->pParam = pParam;
inserter->status = DS_BUF_EMPTY; inserter->status = DS_BUF_EMPTY;
inserter->queryEnd = false; inserter->queryEnd = false;
inserter->pDataBlocks = taosOpenQueue();
int64_t suid = 0;
int32_t code = tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
if (code) {
return code;
}
if (pInserterNode->suid != suid) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return terrno;
}
inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
taosThreadMutexInit(&inserter->mutex, NULL); taosThreadMutexInit(&inserter->mutex, NULL);
if (NULL == inserter->pDataBlocks) { if (NULL == inserter->pDataBlocks) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
tsem_init(&inserter->ready, 0, 0);
*pHandle = inserter; *pHandle = inserter;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -40,6 +40,8 @@ int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHand ...@@ -40,6 +40,8 @@ int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHand
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle); return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
case QUERY_NODE_PHYSICAL_PLAN_DELETE: case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return createDataDeleter(&gDataSinkManager, pDataSink, pHandle, pParam); return createDataDeleter(&gDataSinkManager, pDataSink, pHandle, pParam);
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return createDataInserter(&gDataSinkManager, pDataSink, pHandle, pParam);
} }
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
......
...@@ -52,7 +52,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, ...@@ -52,7 +52,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
if (handle) { if (handle) {
void* pSinkParam = NULL; void* pSinkParam = NULL;
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo); code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -1992,7 +1992,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { ...@@ -1992,7 +1992,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
taosMemoryFreeClear(pMsgBody); taosMemoryFreeClear(pMsgBody);
} }
void qProcessFetchRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
assert(pMsg->info.ahandle != NULL); assert(pMsg->info.ahandle != NULL);
...@@ -4687,10 +4687,20 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) { ...@@ -4687,10 +4687,20 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo) { int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo; SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
switch (pNode->type) { switch (pNode->type) {
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
if (NULL == pInserterParam) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pInserterParam->readHandle = readHandle;
*pParam = pInserterParam;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_DELETE: { case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam)); SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
if (NULL == pDeleterParam) { if (NULL == pDeleterParam) {
......
...@@ -6241,7 +6241,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -6241,7 +6241,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
break; break;
case QUERY_NODE_INSERT_STMT: case QUERY_NODE_INSERT_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_SUBMIT; pQuery->msgType = TDMT_SCH_QUERY;
break; break;
case QUERY_NODE_VNODE_MODIF_STMT: case QUERY_NODE_VNODE_MODIF_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
......
...@@ -80,12 +80,19 @@ typedef struct SQWDebug { ...@@ -80,12 +80,19 @@ typedef struct SQWDebug {
extern SQWDebug gQWDebug; extern SQWDebug gQWDebug;
typedef struct SQWMsgInfo {
int8_t taskType;
int8_t explain;
int8_t needFetch;
} SQWMsgInfo;
typedef struct SQWMsg { typedef struct SQWMsg {
void *node; void *node;
int32_t code; int32_t code;
int32_t msgType; int32_t msgType;
char *msg; char *msg;
int32_t msgLen; int32_t msgLen;
SQWMsgInfo msgInfo;
SRpcHandleInfo connInfo; SRpcHandleInfo connInfo;
} SQWMsg; } SQWMsg;
...@@ -122,9 +129,11 @@ typedef struct SQWTaskCtx { ...@@ -122,9 +129,11 @@ typedef struct SQWTaskCtx {
int8_t phase; int8_t phase;
int8_t taskType; int8_t taskType;
int8_t explain; int8_t explain;
int8_t needFetch;
int32_t queryType; int32_t queryType;
int32_t execId; int32_t execId;
bool queryRsped;
bool queryFetched; bool queryFetched;
bool queryEnd; bool queryEnd;
bool queryContinue; bool queryContinue;
...@@ -161,7 +170,7 @@ typedef struct SQWMsgStat { ...@@ -161,7 +170,7 @@ typedef struct SQWMsgStat {
uint64_t queryProcessed; uint64_t queryProcessed;
uint64_t cqueryProcessed; uint64_t cqueryProcessed;
uint64_t fetchProcessed; uint64_t fetchProcessed;
uint64_t fetchRspProcessed; uint64_t rspProcessed;
uint64_t cancelProcessed; uint64_t cancelProcessed;
uint64_t dropProcessed; uint64_t dropProcessed;
uint64_t hbProcessed; uint64_t hbProcessed;
...@@ -211,8 +220,8 @@ typedef struct SQWorkerMgmt { ...@@ -211,8 +220,8 @@ typedef struct SQWorkerMgmt {
#define QW_STAT_GET(_item) atomic_load_64(&(_item)) #define QW_STAT_GET(_item) atomic_load_64(&(_item))
#define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event]) #define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event])
#define QW_IS_EVENT_RECEIVED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_RECEIVED) #define QW_EVENT_RECEIVED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_PROCESSED) #define QW_EVENT_PROCESSED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED) #define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED) #define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
...@@ -221,13 +230,8 @@ typedef struct SQWorkerMgmt { ...@@ -221,13 +230,8 @@ typedef struct SQWorkerMgmt {
#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code) #define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code) #define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
#define QW_IS_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY) #define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY(status) \
(status == JOB_TASK_STATUS_SUCC || status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_CANCELLED || \
status == JOB_TASK_STATUS_PART_SUCC)
#define QW_SET_QTID(id, qId, tId, eId) \ #define QW_SET_QTID(id, qId, tId, eId) \
do { \ do { \
*(uint64_t *)(id) = (qId); \ *(uint64_t *)(id) = (qId); \
......
...@@ -25,7 +25,7 @@ extern "C" { ...@@ -25,7 +25,7 @@ extern "C" {
int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF); int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF);
int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql); int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql);
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
......
...@@ -366,10 +366,14 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int ...@@ -366,10 +366,14 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
int32_t eId = msg->execId; int32_t eId = msg->execId;
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info, .msgType = pMsg->msgType}; SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info, .msgType = pMsg->msgType};
qwMsg.msgInfo.explain = msg->explain;
qwMsg.msgInfo.taskType = msg->taskType;
qwMsg.msgInfo.needFetch = msg->needFetch;
char * sql = strndup(msg->msg, msg->sqlLen); char * sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, sql:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql); QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, sql:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql);
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain, sql)); QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql));
QW_SCH_TASK_DLOG("processQuery end, node:%p", node); QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -447,14 +451,14 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int ...@@ -447,14 +451,14 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
SQWorker * mgmt = (SQWorker *)qWorkerMgmt; SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
if (mgmt) { if (mgmt) {
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
QW_STAT_INC(mgmt->stat.msgStat.fetchRspProcessed, 1); QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1);
} }
qProcessFetchRsp(NULL, pMsg, NULL); qProcessRspMsg(NULL, pMsg, NULL);
pMsg->pCont = NULL; pMsg->pCont = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -123,11 +123,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { ...@@ -123,11 +123,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
break; break;
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) { if (ctx->needFetch && (!ctx->queryRsped) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
break; break;
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
break; break;
} }
...@@ -293,7 +293,6 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen ...@@ -293,7 +293,6 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0; int32_t code = 0;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
SRpcHandleInfo *cancelConnection = NULL;
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
...@@ -314,13 +313,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -314,13 +313,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
switch (phase) { switch (phase) {
case QW_PHASE_PRE_QUERY: { case QW_PHASE_PRE_QUERY: {
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase)); QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
break; break;
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
//qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); //qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
...@@ -334,29 +333,29 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -334,29 +333,29 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
break; break;
} }
case QW_PHASE_PRE_FETCH: { case QW_PHASE_PRE_FETCH: {
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
} }
if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) { if (!ctx->queryRsped) {
QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase)); QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
} }
break; break;
} }
case QW_PHASE_PRE_CQUERY: { case QW_PHASE_PRE_CQUERY: {
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
//qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); //qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
...@@ -385,11 +384,6 @@ _return: ...@@ -385,11 +384,6 @@ _return:
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
} }
if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, code);
QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
}
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code)); QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
} else { } else {
...@@ -411,7 +405,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp ...@@ -411,7 +405,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
...@@ -420,10 +414,10 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp ...@@ -420,10 +414,10 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
connInfo = ctx->ctrlConnInfo; connInfo = ctx->ctrlConnInfo;
rspConnection = &connInfo; rspConnection = &connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); ctx->queryRsped = true;
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
if (QW_PHASE_POST_FETCH == phase) { if (QW_PHASE_POST_FETCH == phase) {
QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase)); QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
...@@ -512,7 +506,7 @@ _return: ...@@ -512,7 +506,7 @@ _return:
} }
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql) { int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
int32_t code = 0; int32_t code = 0;
bool queryRsped = false; bool queryRsped = false;
SSubplan *plan = NULL; SSubplan *plan = NULL;
...@@ -525,8 +519,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex ...@@ -525,8 +519,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
ctx->taskType = taskType; ctx->taskType = qwMsg->msgInfo.taskType;
ctx->explain = explain; ctx->explain = qwMsg->msgInfo.explain;
ctx->needFetch = qwMsg->msgInfo.needFetch;
ctx->queryType = qwMsg->msgType; ctx->queryType = qwMsg->msgType;
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
...@@ -596,7 +591,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -596,7 +591,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd)); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd));
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
SOutputData sOutput = {0}; SOutputData sOutput = {0};
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
...@@ -633,7 +628,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -633,7 +628,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
break; break;
} }
if (code && QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (code && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwFreeFetchRsp(rsp); qwFreeFetchRsp(rsp);
rsp = NULL; rsp = NULL;
...@@ -695,7 +690,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -695,7 +690,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
locked = true; locked = true;
// RC WARNING // RC WARNING
if (QW_IS_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
atomic_store_8((int8_t *)&ctx->queryContinue, 1); atomic_store_8((int8_t *)&ctx->queryContinue, 1);
} else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) { } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC);
...@@ -742,12 +737,12 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -742,12 +737,12 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
locked = true; locked = true;
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG_E("task already dropping"); QW_TASK_WLOG_E("task already dropping");
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
} }
if (QW_IS_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
} else if (ctx->phase > 0) { } else if (ctx->phase > 0) {
......
...@@ -296,8 +296,8 @@ extern SSchedulerMgmt schMgmt; ...@@ -296,8 +296,8 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1) #define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1) #define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
#define SCH_IS_DATA_SRC_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) #define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_IS_DATA_SRC_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) #define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) #define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
...@@ -317,7 +317,7 @@ extern SSchedulerMgmt schMgmt; ...@@ -317,7 +317,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true #define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) #define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
......
...@@ -247,7 +247,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { ...@@ -247,7 +247,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) { int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
if (!SCH_IS_DATA_SRC_QRY_TASK(pTask)) { if (!SCH_IS_DATA_BIND_QRY_TASK(pTask)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -981,6 +981,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -981,6 +981,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->execId = htonl(pTask->execId); pMsg->execId = htonl(pTask->execId);
pMsg->taskType = TASK_TYPE_TEMP; pMsg->taskType = TASK_TYPE_TEMP;
pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob); pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
pMsg->needFetch = SCH_JOB_NEED_FETCH(pJob);
pMsg->phyLen = htonl(pTask->msgLen); pMsg->phyLen = htonl(pTask->msgLen);
pMsg->sqlLen = htonl(len); pMsg->sqlLen = htonl(len);
......
...@@ -274,7 +274,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -274,7 +274,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
} }
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -311,7 +311,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 ...@@ -311,7 +311,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32
pTask->lastMsgType = 0; pTask->lastMsgType = 0;
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (pData) { if (pData) {
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
} }
...@@ -356,7 +356,7 @@ _return: ...@@ -356,7 +356,7 @@ _return:
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) { int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (NULL == pData->pEpSet) { if (NULL == pData->pEpSet) {
SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode)); SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode));
SCH_ERR_JRET(rspCode); SCH_ERR_JRET(rspCode);
...@@ -490,7 +490,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo ...@@ -490,7 +490,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (SCH_IS_DATA_SRC_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
*needRetry = false; *needRetry = false;
SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId, SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
...@@ -526,7 +526,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { ...@@ -526,7 +526,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
schDeregisterTaskHb(pJob, pTask); schDeregisterTaskHb(pJob, pTask);
if (SCH_IS_DATA_SRC_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
SCH_SWITCH_EPSET(&pTask->plan->execNode); SCH_SWITCH_EPSET(&pTask->plan->execNode);
} else { } else {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
...@@ -594,7 +594,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { ...@@ -594,7 +594,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps); SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册