提交 40470531 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/row_optimize

...@@ -1615,15 +1615,21 @@ typedef struct SSubQueryMsg { ...@@ -1615,15 +1615,21 @@ typedef struct SSubQueryMsg {
uint64_t taskId; uint64_t taskId;
int64_t refId; int64_t refId;
int32_t execId; int32_t execId;
int32_t msgMask;
int8_t taskType; int8_t taskType;
int8_t explain; int8_t explain;
int8_t needFetch; int8_t needFetch;
uint32_t sqlLen; // the query sql, uint32_t sqlLen;
uint32_t phyLen; char *sql;
int32_t msgMask; uint32_t msgLen;
char msg[]; char *msg;
} SSubQueryMsg; } SSubQueryMsg;
int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq);
int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq);
void tFreeSSubQueryMsg(SSubQueryMsg *pReq);
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
uint64_t sId; uint64_t sId;
...@@ -1732,6 +1738,13 @@ typedef struct { ...@@ -1732,6 +1738,13 @@ typedef struct {
int32_t execId; int32_t execId;
} STaskDropReq; } STaskDropReq;
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq);
int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq);
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
typedef struct { typedef struct {
int32_t code; int32_t code;
} STaskDropRsp; } STaskDropRsp;
......
...@@ -4643,6 +4643,178 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { ...@@ -4643,6 +4643,178 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
return 0; return 0;
} }
int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
buf = (char *)buf + headLen;
bufLen -= headLen;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeU64(&encoder, pReq->sId) < 0) return -1;
if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1;
if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1;
if (tEncodeI64(&encoder, pReq->refId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->execId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->msgMask) < 0) return -1;
if (tEncodeI8(&encoder, pReq->taskType) < 0) return -1;
if (tEncodeI8(&encoder, pReq->explain) < 0) return -1;
if (tEncodeI8(&encoder, pReq->needFetch) < 0) return -1;
if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1;
if (tEncodeCStrWithLen(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1;
if (tEncodeU32(&encoder, pReq->msgLen) < 0) return -1;
if (tEncodeBinary(&encoder, (uint8_t*)pReq->msg, pReq->msgLen) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
if (buf != NULL) {
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
pHead->vgId = htonl(pReq->header.vgId);
pHead->contLen = htonl(tlen + headLen);
}
return tlen + headLen;
}
int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) {
int32_t headLen = sizeof(SMsgHead);
SMsgHead *pHead = buf;
pHead->vgId = pReq->header.vgId;
pHead->contLen = pReq->header.contLen;
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->refId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->msgMask) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->taskType) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->explain) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->needFetch) < 0) return -1;
if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1;
if (tDecodeU32(&decoder, &pReq->msgLen) < 0) return -1;
if (tDecodeBinaryAlloc(&decoder, (void**)&pReq->msg, NULL) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSSubQueryMsg(SSubQueryMsg *pReq) {
if (NULL == pReq) {
return;
}
taosMemoryFreeClear(pReq->sql);
taosMemoryFreeClear(pReq->msg);
}
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
buf = (char *)buf + headLen;
bufLen -= headLen;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeU64(&encoder, pReq->sId) < 0) return -1;
if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1;
if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1;
if (tEncodeI64(&encoder, pReq->refId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->execId) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
if (buf != NULL) {
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
pHead->vgId = htonl(pReq->header.vgId);
pHead->contLen = htonl(tlen + headLen);
}
return tlen + headLen;
}
int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
SMsgHead *pHead = buf;
pHead->vgId = pReq->header.vgId;
pHead->contLen = pReq->header.contLen;
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->refId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->code) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->tbFName) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->sversion) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->tversion) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->affectedRows) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->code) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->tbFName) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->sversion) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->tversion) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->affectedRows) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
int32_t headLen = sizeof(SMsgHead); int32_t headLen = sizeof(SMsgHead);
......
...@@ -239,6 +239,7 @@ int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t v ...@@ -239,6 +239,7 @@ int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t v
bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlock(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas);
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
......
...@@ -671,7 +671,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -671,7 +671,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body; SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) { if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) {
/*ASSERT(0);*/
} }
if (taosxRsp.blockNum > 0 /* threshold */) { if (taosxRsp.blockNum > 0 /* threshold */) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
......
...@@ -44,7 +44,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRs ...@@ -44,7 +44,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRs
return 0; return 0;
} }
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0); metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
// TODO add reference to gurantee success // TODO add reference to gurantee success
...@@ -52,8 +52,10 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { ...@@ -52,8 +52,10 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
metaReaderClear(&mr); metaReaderClear(&mr);
return -1; return -1;
} }
char* tbName = strdup(mr.me.name); for (int32_t i = 0; i < n; i++) {
taosArrayPush(pRsp->blockTbName, &tbName); char* tbName = strdup(mr.me.name);
taosArrayPush(pRsp->blockTbName, &tbName);
}
metaReaderClear(&mr); metaReaderClear(&mr);
return 0; return 0;
} }
...@@ -111,7 +113,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs ...@@ -111,7 +113,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
if (pRsp->withTbName) { if (pRsp->withTbName) {
if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) { if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp); tqAddTbNameToRsp(pTq, uid, pRsp, 1);
} else { } else {
pRsp->withTbName = false; pRsp->withTbName = false;
} }
...@@ -155,7 +157,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta ...@@ -155,7 +157,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
int64_t uid = 0; int64_t uid = 0;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
uid = pExec->pExecReader->msgIter.uid; uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, 1) < 0) {
continue; continue;
} }
} else { } else {
...@@ -225,18 +227,30 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -225,18 +227,30 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
STqExecHandle* pExec = &pHandle->execHandle; STqExecHandle* pExec = &pHandle->execHandle;
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
SArray* pSchemas = taosArrayInit(0, sizeof(void*));
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
STqReader* pReader = pExec->pExecReader; STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0); tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) { while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0}; /*SSDataBlock block = {0};*/
if (tqRetrieveDataBlock(&block, pReader) < 0) { /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) {
blockDataFreeRes(&block); taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
continue; continue;
} }
} }
...@@ -255,25 +269,37 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -255,25 +269,37 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
pRsp->createTableNum++; pRsp->createTableNum++;
} }
} }
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock), for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
pTq->pVnode->config.tsdbCfg.precision); SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
blockDataFreeRes(&block); tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); pTq->pVnode->config.tsdbCfg.precision);
pRsp->blockNum++; blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++;
}
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReader* pReader = pExec->pExecReader; STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0); tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
SSDataBlock block = {0}; /*SSDataBlock block = {0};*/
if (tqRetrieveDataBlock(&block, pReader) < 0) { /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) {
blockDataFreeRes(&block); taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
continue; taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
return -1;
} }
} }
if (pHandle->fetchMeta) { if (pHandle->fetchMeta) {
...@@ -291,14 +317,26 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -291,14 +317,26 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
pRsp->createTableNum++; pRsp->createTableNum++;
} }
} }
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock), /*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/
pTq->pVnode->config.tsdbCfg.precision); /*pTq->pVnode->config.tsdbCfg.precision);*/
blockDataFreeRes(&block); /*blockDataFreeRes(&block);*/
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); /*tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);*/
pRsp->blockNum++; /*pRsp->blockNum++;*/
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++;
}
} }
} }
taosArrayDestroy(pBlocks);
taosArrayDestroy(pSchemas);
if (pRsp->blockNum == 0) { if (pRsp->blockNum == 0) {
return -1; return -1;
} }
......
...@@ -556,7 +556,7 @@ FAIL: ...@@ -556,7 +556,7 @@ FAIL:
return -1; return -1;
} }
int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* schemas) { int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas) {
int32_t sversion = htonl(pReader->pBlock->sversion); int32_t sversion = htonl(pReader->pBlock->sversion);
if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
...@@ -592,9 +592,10 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch ...@@ -592,9 +592,10 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch
int32_t colAtMost = pSchemaWrapper->nCols; int32_t colAtMost = pSchemaWrapper->nCols;
int32_t curRow = 0; int32_t curRow = 0;
int32_t lastRow = 0;
char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
if (assigned) return -1; if (assigned == NULL) return -1;
tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter); tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter);
STSRowIter iter = {0}; STSRowIter iter = {0};
...@@ -605,11 +606,13 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch ...@@ -605,11 +606,13 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch
bool buildNew = false; bool buildNew = false;
tdSTSRowIterReset(&iter, row); tdSTSRowIterReset(&iter, row);
tqDebug("vgId:%d, row of block %d", pReader->pWalReader->pWal->cfg.vgId, curRow);
for (int32_t i = 0; i < colAtMost; i++) { for (int32_t i = 0; i < colAtMost; i++) {
SCellVal sVal = {0}; SCellVal sVal = {0};
if (!tdSTSRowIterFetch(&iter, pSchemaWrapper->pSchema[i].colId, pSchemaWrapper->pSchema[i].type, &sVal)) { if (!tdSTSRowIterFetch(&iter, pSchemaWrapper->pSchema[i].colId, pSchemaWrapper->pSchema[i].type, &sVal)) {
break; break;
} }
tqDebug("vgId:%d, %d col, type %d", pReader->pWalReader->pWal->cfg.vgId, i, sVal.valType);
if (curRow == 0) { if (curRow == 0) {
assigned[i] = sVal.valType != TD_VTYPE_NONE; assigned[i] = sVal.valType != TD_VTYPE_NONE;
buildNew = true; buildNew = true;
...@@ -623,27 +626,42 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch ...@@ -623,27 +626,42 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch
} }
if (buildNew) { if (buildNew) {
SSDataBlock block; if (taosArrayGetSize(blocks) > 0) {
SSchemaWrapper sw; SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
if (tqMaskBlock(&sw, &block, pSchemaWrapper, assigned) < 0) { pLastBlock->info.rows = curRow - lastRow;
lastRow = curRow;
}
SSDataBlock* pBlock = createDataBlock();
SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (tqMaskBlock(pSW, pBlock, pSchemaWrapper, assigned) < 0) {
blockDataDestroy(pBlock);
goto FAIL; goto FAIL;
} }
SSDataBlock block = {0};
assignOneDataBlock(&block, pBlock);
blockDataDestroy(pBlock);
tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(block.pDataBlock));
taosArrayPush(blocks, &block); taosArrayPush(blocks, &block);
taosArrayPush(schemas, &sw); taosArrayPush(schemas, &pSW);
} }
SSDataBlock* pBlock = taosArrayGetLast(blocks); SSDataBlock* pBlock = taosArrayGetLast(blocks);
pBlock->info.uid = pReader->msgIter.uid; pBlock->info.uid = pReader->msgIter.uid;
pBlock->info.rows = pReader->msgIter.numOfRows; pBlock->info.rows = 0;
pBlock->info.version = pReader->pMsg->version; pBlock->info.version = pReader->pMsg->version;
tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(blocks));
if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows - curRow) < 0) { if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows - curRow) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; goto FAIL;
} }
tdSTSRowIterInit(&iter, pTschema); tdSTSRowIterReset(&iter, row);
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) { for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
SCellVal sVal = {0}; SCellVal sVal = {0};
...@@ -654,12 +672,16 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch ...@@ -654,12 +672,16 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch
ASSERT(sVal.valType != TD_VTYPE_NONE); ASSERT(sVal.valType != TD_VTYPE_NONE);
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) { if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
goto FAIL; goto FAIL;
} }
tqDebug("vgId:%d, row %d col %d append %d", pReader->pWalReader->pWal->cfg.vgId, curRow, i,
sVal.valType == TD_VTYPE_NULL);
} }
curRow++; curRow++;
} }
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
taosMemoryFree(assigned); taosMemoryFree(assigned);
return 0; return 0;
......
...@@ -349,7 +349,6 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -349,7 +349,6 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
.contLen = len + sizeof(SMsgHead), .contLen = len + sizeof(SMsgHead),
}; };
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(serializedDeleteReq);
tqDebug("failed to put delete req into write-queue since %s", terrstr()); tqDebug("failed to put delete req into write-queue since %s", terrstr());
} }
} else { } else {
...@@ -476,12 +475,12 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -476,12 +475,12 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
SSubmitReq* ret = rpcMallocCont(cap); SSubmitReq* pSubmit = rpcMallocCont(cap);
ret->header.vgId = pVnode->config.vgId; pSubmit->header.vgId = pVnode->config.vgId;
ret->length = sizeof(SSubmitReq); pSubmit->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(1); pSubmit->numOfBlocks = htonl(1);
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq)); SSubmitBlk* blkHead = POINTER_SHIFT(pSubmit, sizeof(SSubmitReq));
blkHead->numOfRows = htonl(pDataBlock->info.rows); blkHead->numOfRows = htonl(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version); blkHead->sversion = htonl(pTSchema->version);
...@@ -531,17 +530,16 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -531,17 +530,16 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
} }
blkHead->dataLen = htonl(dataLen); blkHead->dataLen = htonl(dataLen);
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen; pSubmit->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
ret->length = htonl(ret->length); pSubmit->length = htonl(pSubmit->length);
SRpcMsg msg = { SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT, .msgType = TDMT_VND_SUBMIT,
.pCont = ret, .pCont = pSubmit,
.contLen = ntohl(ret->length), .contLen = ntohl(pSubmit->length),
}; };
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(ret);
tqDebug("failed to put into write-queue since %s", terrstr()); tqDebug("failed to put into write-queue since %s", terrstr());
} }
} }
......
...@@ -239,6 +239,7 @@ typedef struct SSourceDataInfo { ...@@ -239,6 +239,7 @@ typedef struct SSourceDataInfo {
int32_t index; int32_t index;
SRetrieveTableRsp* pRsp; SRetrieveTableRsp* pRsp;
uint64_t totalRows; uint64_t totalRows;
int64_t startTime;
int32_t code; int32_t code;
EX_SOURCE_STATUS status; EX_SOURCE_STATUS status;
const char* taskId; const char* taskId;
......
...@@ -44,7 +44,7 @@ typedef struct SFetchRspHandleWrapper { ...@@ -44,7 +44,7 @@ typedef struct SFetchRspHandleWrapper {
static void destroyExchangeOperatorInfo(void* param); static void destroyExchangeOperatorInfo(void* param);
static void freeBlock(void* pParam); static void freeBlock(void* pParam);
static void freeSourceDataInfo(void* param); static void freeSourceDataInfo(void* param);
static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs); static void* setAllSourcesCompleted(SOperatorInfo* pOperator);
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code); static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex); static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
...@@ -59,7 +59,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -59,7 +59,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
int32_t completed = getCompletedSources(pExchangeInfo->pSourceDataInfo); int32_t completed = getCompletedSources(pExchangeInfo->pSourceDataInfo);
if (completed == totalSources) { if (completed == totalSources) {
setAllSourcesCompleted(pOperator, pExchangeInfo->openedTs); setAllSourcesCompleted(pOperator);
return; return;
} }
...@@ -113,7 +113,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -113,7 +113,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
taosArrayPush(pExchangeInfo->pResultBlockList, &pb); taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
} }
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pExchangeInfo->openedTs, pOperator); updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
pDataInfo->totalRows += pRetrieveRsp->numOfRows;
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
...@@ -388,6 +389,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas ...@@ -388,6 +389,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex); SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
pDataInfo->startTime = taosGetTimestampUs();
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY); ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
...@@ -493,18 +495,14 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo ...@@ -493,18 +495,14 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { void* setAllSourcesCompleted(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int64_t el = taosGetTimestampUs() - startTs;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
pLoadInfo->totalElapsed += el;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms", qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize, GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
pLoadInfo->totalElapsed / 1000.0); pLoadInfo->totalElapsed / 1000.0);
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
...@@ -566,7 +564,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -566,7 +564,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
while (1) { while (1) {
if (pExchangeInfo->current >= totalSources) { if (pExchangeInfo->current >= totalSources) {
setAllSourcesCompleted(pOperator, startTs); setAllSourcesCompleted(pOperator);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -690,6 +690,8 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { ...@@ -690,6 +690,8 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator); SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator);
if (pBlock != NULL) { if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
...@@ -754,7 +756,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -754,7 +756,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
pInfo->groupSort = pMergePhyNode->groupSort; pInfo->groupSort = pMergePhyNode->groupSort;
......
...@@ -65,19 +65,37 @@ int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c ...@@ -65,19 +65,37 @@ int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) { int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) {
STbVerInfo *tbInfo = ctx ? &ctx->tbInfo : NULL; STbVerInfo *tbInfo = ctx ? &ctx->tbInfo : NULL;
int64_t affectedRows = ctx ? ctx->affectedRows : 0; int64_t affectedRows = ctx ? ctx->affectedRows : 0;
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp rsp = {0};
pRsp->code = htonl(code); rsp.code = code;
pRsp->affectedRows = htobe64(affectedRows); rsp.affectedRows = affectedRows;
if (tbInfo) { if (tbInfo) {
strcpy(pRsp->tbFName, tbInfo->tbFName); strcpy(rsp.tbFName, tbInfo->tbFName);
pRsp->sversion = htonl(tbInfo->sversion); rsp.sversion = tbInfo->sversion;
pRsp->tversion = htonl(tbInfo->tversion); rsp.tversion = tbInfo->tversion;
}
int32_t msgSize = tSerializeSQueryTableRsp(NULL, 0, &rsp);
if (msgSize < 0) {
qError("tSerializeSQueryTableRsp failed");
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
void *pRsp = rpcMallocCont(msgSize);
if (NULL == pRsp) {
qError("rpcMallocCont %d failed", msgSize);
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (tSerializeSQueryTableRsp(pRsp, msgSize, &rsp) < 0) {
qError("tSerializeSQueryTableRsp %d failed", msgSize);
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = rspType, .msgType = rspType,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp), .contLen = msgSize,
.code = code, .code = code,
.info = *pConn, .info = *pConn,
}; };
...@@ -182,23 +200,37 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) { ...@@ -182,23 +200,37 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
#endif #endif
int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); STaskDropReq qMsg;
if (NULL == req) { qMsg.header.vgId = mgmt->nodeId;
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq)); qMsg.header.contLen = 0;
qMsg.sId = sId;
qMsg.queryId = qId;
qMsg.taskId = tId;
qMsg.refId = rId;
qMsg.execId = eId;
int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
if (msgSize < 0) {
QW_SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
void *msg = rpcMallocCont(msgSize);
if (NULL == msg) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", msgSize);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (tSerializeSTaskDropReq(msg, msgSize, &qMsg) < 0) {
QW_SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize);
rpcFreeCont(msg);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = qId;
req->taskId = tId;
req->refId = rId;
req->execId = eId;
SRpcMsg pNewMsg = { SRpcMsg pNewMsg = {
.msgType = TDMT_SCH_DROP_TASK, .msgType = TDMT_SCH_DROP_TASK,
.pCont = req, .pCont = msg,
.contLen = sizeof(STaskDropReq), .contLen = msgSize,
.code = 0, .code = 0,
.info = *pConn, .info = *pConn,
}; };
...@@ -247,22 +279,37 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { ...@@ -247,22 +279,37 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
} }
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); STaskDropReq qMsg;
if (NULL == req) { qMsg.header.vgId = mgmt->nodeId;
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq)); qMsg.header.contLen = 0;
qMsg.sId = sId;
qMsg.queryId = qId;
qMsg.taskId = tId;
qMsg.refId = rId;
qMsg.execId = eId;
int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
if (msgSize < 0) {
QW_SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
void *msg = rpcMallocCont(msgSize);
if (NULL == msg) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", msgSize);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (tSerializeSTaskDropReq(msg, msgSize, &qMsg) < 0) {
QW_SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize);
rpcFreeCont(msg);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
req->header.vgId = htonl(mgmt->nodeId);
req->sId = htobe64(sId);
req->queryId = htobe64(qId);
req->taskId = htobe64(tId);
req->refId = htobe64(rId);
SRpcMsg brokenMsg = { SRpcMsg brokenMsg = {
.msgType = TDMT_SCH_DROP_TASK, .msgType = TDMT_SCH_DROP_TASK,
.pCont = req, .pCont = msg,
.contLen = sizeof(STaskDropReq), .contLen = msgSize,
.code = TSDB_CODE_RPC_BROKEN_LINK, .code = TSDB_CODE_RPC_BROKEN_LINK,
.info = *pConn, .info = *pConn,
}; };
...@@ -312,40 +359,33 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran ...@@ -312,40 +359,33 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran
} }
int32_t code = 0; int32_t code = 0;
SSubQueryMsg *msg = pMsg->pCont;
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
SSubQueryMsg msg = {0};
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
msg->sId = be64toh(msg->sId); if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg.msgMask)) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) {
msg->queryId = be64toh(msg->queryId); QW_ELOG("query failed cause of grant expired, msgMask:%d", msg.msgMask);
msg->taskId = be64toh(msg->taskId); tFreeSSubQueryMsg(&msg);
msg->refId = be64toh(msg->refId);
msg->execId = ntohl(msg->execId);
msg->phyLen = ntohl(msg->phyLen);
msg->sqlLen = ntohl(msg->sqlLen);
msg->msgMask = ntohl(msg->msgMask);
if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg->msgMask)) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) {
QW_ELOG("query failed cause of grant expired, msgMask:%d", msg->msgMask);
QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED); QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
} }
uint64_t sId = msg->sId; uint64_t sId = msg.sId;
uint64_t qId = msg->queryId; uint64_t qId = msg.queryId;
uint64_t tId = msg->taskId; uint64_t tId = msg.taskId;
int64_t rId = msg->refId; int64_t rId = msg.refId;
int32_t eId = msg->execId; int32_t eId = msg.execId;
SQWMsg qwMsg = { SQWMsg qwMsg = {
.msgType = pMsg->msgType, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info}; .msgType = pMsg->msgType, .msg = msg.msg, .msgLen = msg.msgLen, .connInfo = pMsg->info};
QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle); QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p, SQL:%s", pMsg->info.handle, msg.sql);
QW_ERR_RET(qwPreprocessQuery(QW_FPARAMS(), &qwMsg)); code = qwPreprocessQuery(QW_FPARAMS(), &qwMsg);
QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p", pMsg->info.handle); QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p, code:%x", pMsg->info.handle, code);
tFreeSSubQueryMsg(&msg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -355,19 +395,25 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -355,19 +395,25 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SSubQueryMsg *msg = pMsg->pCont;
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
SSubQueryMsg msg = {0};
if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
uint64_t sId = msg->sId; uint64_t sId = msg.sId;
uint64_t qId = msg->queryId; uint64_t qId = msg.queryId;
uint64_t tId = msg->taskId; uint64_t tId = msg.taskId;
int64_t rId = msg->refId; int64_t rId = msg.refId;
int32_t eId = msg->execId; int32_t eId = msg.execId;
QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle); QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
qwAbortPrerocessQuery(QW_FPARAMS()); qwAbortPrerocessQuery(QW_FPARAMS());
QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle); QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle);
tFreeSSubQueryMsg(&msg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -377,42 +423,41 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int ...@@ -377,42 +423,41 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
} }
int32_t code = 0; int32_t code = 0;
SSubQueryMsg *msg = pMsg->pCont;
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1); QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { SSubQueryMsg msg = {0};
QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen); if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
uint64_t sId = msg->sId; uint64_t sId = msg.sId;
uint64_t qId = msg->queryId; uint64_t qId = msg.queryId;
uint64_t tId = msg->taskId; uint64_t tId = msg.taskId;
int64_t rId = msg->refId; int64_t rId = msg.refId;
int32_t eId = msg->execId; int32_t eId = msg.execId;
SQWMsg qwMsg = {.node = node, SQWMsg qwMsg = {.node = node,
.msg = msg->msg + msg->sqlLen, .msg = msg.msg,
.msgLen = msg->phyLen, .msgLen = msg.msgLen,
.connInfo = pMsg->info, .connInfo = pMsg->info,
.msgType = pMsg->msgType}; .msgType = pMsg->msgType};
qwMsg.msgInfo.explain = msg->explain; qwMsg.msgInfo.explain = msg.explain;
qwMsg.msgInfo.taskType = msg->taskType; qwMsg.msgInfo.taskType = msg.taskType;
qwMsg.msgInfo.needFetch = msg->needFetch; qwMsg.msgInfo.needFetch = msg.needFetch;
char *sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType),
pMsg->info.handle, sql); pMsg->info.handle, msg.sql);
QW_ERR_JRET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql)); code = qwProcessQuery(QW_FPARAMS(), &qwMsg, msg.sql);
msg.sql = NULL;
QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%x", node, code);
_return: tFreeSSubQueryMsg(&msg);
QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%d", node, code);
return code; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
...@@ -548,28 +593,22 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 ...@@ -548,28 +593,22 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
} }
int32_t code = 0; int32_t code = 0;
STaskDropReq *msg = pMsg->pCont;
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1); QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { STaskDropReq msg = {0};
QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen); if (tDeserializeSTaskDropReq(pMsg->pCont, pMsg->contLen, &msg) < 0) {
QW_ELOG("tDeserializeSTaskDropReq failed, contLen:%d", pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
msg->sId = be64toh(msg->sId); uint64_t sId = msg.sId;
msg->queryId = be64toh(msg->queryId); uint64_t qId = msg.queryId;
msg->taskId = be64toh(msg->taskId); uint64_t tId = msg.taskId;
msg->refId = be64toh(msg->refId); int64_t rId = msg.refId;
msg->execId = ntohl(msg->execId); int32_t eId = msg.execId;
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
int32_t eId = msg->execId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
......
...@@ -114,7 +114,7 @@ void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) { ...@@ -114,7 +114,7 @@ void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
qwtqueryMsg.queryId = htobe64(atomic_add_fetch_64(&qwtTestQueryId, 1)); qwtqueryMsg.queryId = htobe64(atomic_add_fetch_64(&qwtTestQueryId, 1));
qwtqueryMsg.sId = htobe64(1); qwtqueryMsg.sId = htobe64(1);
qwtqueryMsg.taskId = htobe64(1); qwtqueryMsg.taskId = htobe64(1);
qwtqueryMsg.phyLen = htonl(100); qwtqueryMsg.msgLen = htonl(100);
qwtqueryMsg.sqlLen = 0; qwtqueryMsg.sqlLen = 0;
queryRpc->msgType = TDMT_SCH_QUERY; queryRpc->msgType = TDMT_SCH_QUERY;
queryRpc->pCont = &qwtqueryMsg; queryRpc->pCont = &qwtqueryMsg;
...@@ -131,12 +131,29 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) { ...@@ -131,12 +131,29 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
} }
void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
dropMsg->sId = htobe64(1); dropMsg->sId = 1;
dropMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId)); dropMsg->queryId = atomic_load_64(&qwtTestQueryId);
dropMsg->taskId = htobe64(1); dropMsg->taskId = 1;
int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, dropMsg);
if (msgSize < 0) {
return;
}
char *msg = (char*)taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
return;
}
if (tSerializeSTaskDropReq(msg, msgSize, dropMsg) < 0) {
taosMemoryFree(msg);
return;
}
dropRpc->msgType = TDMT_SCH_DROP_TASK; dropRpc->msgType = TDMT_SCH_DROP_TASK;
dropRpc->pCont = dropMsg; dropRpc->pCont = msg;
dropRpc->contLen = sizeof(STaskDropReq); dropRpc->contLen = msgSize;
} }
int32_t qwtStringToPlan(const char *str, SSubplan **subplan) { int32_t qwtStringToPlan(const char *str, SSubplan **subplan) {
......
...@@ -334,17 +334,17 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa ...@@ -334,17 +334,17 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SQueryTableRsp *rsp = (SQueryTableRsp *)msg; SQueryTableRsp rsp = {0};
rsp->code = ntohl(rsp->code); if (tDeserializeSQueryTableRsp(msg, msgSize, &rsp) < 0) {
rsp->sversion = ntohl(rsp->sversion); SCH_TASK_ELOG("tDeserializeSQueryTableRsp failed, msgSize:%d", msgSize);
rsp->tversion = ntohl(rsp->tversion); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_MSG);
rsp->affectedRows = be64toh(rsp->affectedRows); }
SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(rsp.code);
SCH_ERR_JRET(schSaveJobExecRes(pJob, rsp)); SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp));
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows);
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);
...@@ -1042,30 +1042,40 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1042,30 +1042,40 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
case TDMT_SCH_MERGE_QUERY: { case TDMT_SCH_MERGE_QUERY: {
SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx)); SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
uint32_t len = strlen(pJob->sql); SSubQueryMsg qMsg;
msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len; qMsg.header.vgId = addr->nodeId;
qMsg.header.contLen = 0;
qMsg.sId = schMgmt.sId;
qMsg.queryId = pJob->queryId;
qMsg.taskId = pTask->taskId;
qMsg.refId = pJob->refId;
qMsg.execId = pTask->execId;
qMsg.msgMask = (pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0;
qMsg.taskType = TASK_TYPE_TEMP;
qMsg.explain = SCH_IS_EXPLAIN_JOB(pJob);
qMsg.needFetch = SCH_TASK_NEED_FETCH(pTask);
qMsg.sqlLen = strlen(pJob->sql);
qMsg.sql = pJob->sql;
qMsg.msgLen = pTask->msgLen;
qMsg.msg = pTask->msg;
msgSize = tSerializeSSubQueryMsg(NULL, 0, &qMsg);
if (msgSize < 0) {
SCH_TASK_ELOG("tSerializeSSubQueryMsg get size, msgSize:%d", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
msg = taosMemoryCalloc(1, msgSize); msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SSubQueryMsg *pMsg = msg; if (tSerializeSSubQueryMsg(msg, msgSize, &qMsg) < 0) {
pMsg->header.vgId = htonl(addr->nodeId); SCH_TASK_ELOG("tSerializeSSubQueryMsg failed, msgSize:%d", msgSize);
pMsg->sId = htobe64(schMgmt.sId); taosMemoryFree(msg);
pMsg->queryId = htobe64(pJob->queryId); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
pMsg->taskId = htobe64(pTask->taskId); }
pMsg->refId = htobe64(pJob->refId);
pMsg->execId = htonl(pTask->execId);
pMsg->taskType = TASK_TYPE_TEMP;
pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
pMsg->needFetch = SCH_TASK_NEED_FETCH(pTask);
pMsg->phyLen = htonl(pTask->msgLen);
pMsg->sqlLen = htonl(len);
pMsg->msgMask = htonl((pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0);
memcpy(pMsg->msg, pJob->sql, len);
memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
persistHandle = true; persistHandle = true;
SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle());
...@@ -1092,22 +1102,32 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1092,22 +1102,32 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
break; break;
} }
case TDMT_SCH_DROP_TASK: { case TDMT_SCH_DROP_TASK: {
msgSize = sizeof(STaskDropReq); STaskDropReq qMsg;
qMsg.header.vgId = addr->nodeId;
qMsg.header.contLen = 0;
qMsg.sId = schMgmt.sId;
qMsg.queryId = pJob->queryId;
qMsg.taskId = pTask->taskId;
qMsg.refId = pJob->refId;
qMsg.execId = pTask->execId;
msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
if (msgSize < 0) {
SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
msg = taosMemoryCalloc(1, msgSize); msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
STaskDropReq *pMsg = msg; if (tSerializeSTaskDropReq(msg, msgSize, &qMsg) < 0) {
SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize);
pMsg->header.vgId = htonl(addr->nodeId); taosMemoryFree(msg);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
pMsg->sId = htobe64(schMgmt.sId); }
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
pMsg->refId = htobe64(pJob->refId);
pMsg->execId = htonl(pTask->execId);
break; break;
} }
case TDMT_SCH_QUERY_HEARTBEAT: { case TDMT_SCH_QUERY_HEARTBEAT: {
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册