提交 19852ba8 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0' into enh/msgRefactor2

...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 23e2b73 GIT_TAG e00ebd9
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -59,7 +59,8 @@ function check_taosd_exit_type() { ...@@ -59,7 +59,8 @@ function check_taosd_exit_type() {
if [ ! -z "$core_files" ]; then if [ ! -z "$core_files" ]; then
# move core files to another folder # move core files to another folder
mkdir -p ${BACKUP_CORE_FOLDER} mkdir -p ${BACKUP_CORE_FOLDER}
mv ${core_folder}/${core_prefix}* ${BACKUP_CORE_FOLDER}/ cp ${core_folder}/${core_prefix}* ${BACKUP_CORE_FOLDER}/
rm -f ${core_folder}/${core_prefix}*
set_service_state "error" "taosd exit with core file" set_service_state "error" "taosd exit with core file"
else else
set_service_state "error" "taosd exit without core file" set_service_state "error" "taosd exit without core file"
......
...@@ -239,6 +239,7 @@ int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t v ...@@ -239,6 +239,7 @@ int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t v
bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlock(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas);
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
......
...@@ -677,7 +677,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -677,7 +677,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body; SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) { if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) {
/*ASSERT(0);*/
} }
if (taosxRsp.blockNum > 0 /* threshold */) { if (taosxRsp.blockNum > 0 /* threshold */) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
......
...@@ -44,7 +44,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRs ...@@ -44,7 +44,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRs
return 0; return 0;
} }
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0); metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
// TODO add reference to gurantee success // TODO add reference to gurantee success
...@@ -52,8 +52,10 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { ...@@ -52,8 +52,10 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
metaReaderClear(&mr); metaReaderClear(&mr);
return -1; return -1;
} }
char* tbName = strdup(mr.me.name); for (int32_t i = 0; i < n; i++) {
taosArrayPush(pRsp->blockTbName, &tbName); char* tbName = strdup(mr.me.name);
taosArrayPush(pRsp->blockTbName, &tbName);
}
metaReaderClear(&mr); metaReaderClear(&mr);
return 0; return 0;
} }
...@@ -111,7 +113,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs ...@@ -111,7 +113,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
if (pRsp->withTbName) { if (pRsp->withTbName) {
if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) { if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp); tqAddTbNameToRsp(pTq, uid, pRsp, 1);
} else { } else {
pRsp->withTbName = false; pRsp->withTbName = false;
} }
...@@ -155,7 +157,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta ...@@ -155,7 +157,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
int64_t uid = 0; int64_t uid = 0;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
uid = pExec->pExecReader->msgIter.uid; uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, 1) < 0) {
continue; continue;
} }
} else { } else {
...@@ -225,18 +227,30 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -225,18 +227,30 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
STqExecHandle* pExec = &pHandle->execHandle; STqExecHandle* pExec = &pHandle->execHandle;
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
SArray* pSchemas = taosArrayInit(0, sizeof(void*));
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
STqReader* pReader = pExec->pExecReader; STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0); tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) { while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0}; /*SSDataBlock block = {0};*/
if (tqRetrieveDataBlock(&block, pReader) < 0) { /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) {
blockDataFreeRes(&block); taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
continue; continue;
} }
} }
...@@ -255,25 +269,37 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -255,25 +269,37 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
pRsp->createTableNum++; pRsp->createTableNum++;
} }
} }
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock), for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
pTq->pVnode->config.tsdbCfg.precision); SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
blockDataFreeRes(&block); tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); pTq->pVnode->config.tsdbCfg.precision);
pRsp->blockNum++; blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++;
}
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReader* pReader = pExec->pExecReader; STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0); tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
SSDataBlock block = {0}; /*SSDataBlock block = {0};*/
if (tqRetrieveDataBlock(&block, pReader) < 0) { /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) {
blockDataFreeRes(&block); taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
continue; taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
return -1;
} }
} }
if (pHandle->fetchMeta) { if (pHandle->fetchMeta) {
...@@ -291,14 +317,26 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -291,14 +317,26 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
pRsp->createTableNum++; pRsp->createTableNum++;
} }
} }
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock), /*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/
pTq->pVnode->config.tsdbCfg.precision); /*pTq->pVnode->config.tsdbCfg.precision);*/
blockDataFreeRes(&block); /*blockDataFreeRes(&block);*/
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); /*tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);*/
pRsp->blockNum++; /*pRsp->blockNum++;*/
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++;
}
} }
} }
taosArrayDestroy(pBlocks);
taosArrayDestroy(pSchemas);
if (pRsp->blockNum == 0) { if (pRsp->blockNum == 0) {
return -1; return -1;
} }
......
...@@ -556,7 +556,7 @@ FAIL: ...@@ -556,7 +556,7 @@ FAIL:
return -1; return -1;
} }
int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* schemas) { int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas) {
int32_t sversion = htonl(pReader->pBlock->sversion); int32_t sversion = htonl(pReader->pBlock->sversion);
if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
...@@ -592,9 +592,10 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch ...@@ -592,9 +592,10 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch
int32_t colAtMost = pSchemaWrapper->nCols; int32_t colAtMost = pSchemaWrapper->nCols;
int32_t curRow = 0; int32_t curRow = 0;
int32_t lastRow = 0;
char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
if (assigned) return -1; if (assigned == NULL) return -1;
tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter); tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter);
STSRowIter iter = {0}; STSRowIter iter = {0};
...@@ -605,11 +606,13 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch ...@@ -605,11 +606,13 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch
bool buildNew = false; bool buildNew = false;
tdSTSRowIterReset(&iter, row); tdSTSRowIterReset(&iter, row);
tqDebug("vgId:%d, row of block %d", pReader->pWalReader->pWal->cfg.vgId, curRow);
for (int32_t i = 0; i < colAtMost; i++) { for (int32_t i = 0; i < colAtMost; i++) {
SCellVal sVal = {0}; SCellVal sVal = {0};
if (!tdSTSRowIterFetch(&iter, pSchemaWrapper->pSchema[i].colId, pSchemaWrapper->pSchema[i].type, &sVal)) { if (!tdSTSRowIterFetch(&iter, pSchemaWrapper->pSchema[i].colId, pSchemaWrapper->pSchema[i].type, &sVal)) {
break; break;
} }
tqDebug("vgId:%d, %d col, type %d", pReader->pWalReader->pWal->cfg.vgId, i, sVal.valType);
if (curRow == 0) { if (curRow == 0) {
assigned[i] = sVal.valType != TD_VTYPE_NONE; assigned[i] = sVal.valType != TD_VTYPE_NONE;
buildNew = true; buildNew = true;
...@@ -623,27 +626,42 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch ...@@ -623,27 +626,42 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch
} }
if (buildNew) { if (buildNew) {
SSDataBlock block; if (taosArrayGetSize(blocks) > 0) {
SSchemaWrapper sw; SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
if (tqMaskBlock(&sw, &block, pSchemaWrapper, assigned) < 0) { pLastBlock->info.rows = curRow - lastRow;
lastRow = curRow;
}
SSDataBlock* pBlock = createDataBlock();
SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (tqMaskBlock(pSW, pBlock, pSchemaWrapper, assigned) < 0) {
blockDataDestroy(pBlock);
goto FAIL; goto FAIL;
} }
SSDataBlock block = {0};
assignOneDataBlock(&block, pBlock);
blockDataDestroy(pBlock);
tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(block.pDataBlock));
taosArrayPush(blocks, &block); taosArrayPush(blocks, &block);
taosArrayPush(schemas, &sw); taosArrayPush(schemas, &pSW);
} }
SSDataBlock* pBlock = taosArrayGetLast(blocks); SSDataBlock* pBlock = taosArrayGetLast(blocks);
pBlock->info.uid = pReader->msgIter.uid; pBlock->info.uid = pReader->msgIter.uid;
pBlock->info.rows = pReader->msgIter.numOfRows; pBlock->info.rows = 0;
pBlock->info.version = pReader->pMsg->version; pBlock->info.version = pReader->pMsg->version;
tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(blocks));
if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows - curRow) < 0) { if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows - curRow) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; goto FAIL;
} }
tdSTSRowIterInit(&iter, pTschema); tdSTSRowIterReset(&iter, row);
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) { for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
SCellVal sVal = {0}; SCellVal sVal = {0};
...@@ -654,12 +672,16 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch ...@@ -654,12 +672,16 @@ int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* sch
ASSERT(sVal.valType != TD_VTYPE_NONE); ASSERT(sVal.valType != TD_VTYPE_NONE);
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) { if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
goto FAIL; goto FAIL;
} }
tqDebug("vgId:%d, row %d col %d append %d", pReader->pWalReader->pWal->cfg.vgId, curRow, i,
sVal.valType == TD_VTYPE_NULL);
} }
curRow++; curRow++;
} }
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
taosMemoryFree(assigned); taosMemoryFree(assigned);
return 0; return 0;
......
...@@ -349,7 +349,6 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -349,7 +349,6 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
.contLen = len + sizeof(SMsgHead), .contLen = len + sizeof(SMsgHead),
}; };
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(serializedDeleteReq);
tqDebug("failed to put delete req into write-queue since %s", terrstr()); tqDebug("failed to put delete req into write-queue since %s", terrstr());
} }
} else { } else {
...@@ -476,12 +475,12 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -476,12 +475,12 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
SSubmitReq* ret = rpcMallocCont(cap); SSubmitReq* pSubmit = rpcMallocCont(cap);
ret->header.vgId = pVnode->config.vgId; pSubmit->header.vgId = pVnode->config.vgId;
ret->length = sizeof(SSubmitReq); pSubmit->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(1); pSubmit->numOfBlocks = htonl(1);
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq)); SSubmitBlk* blkHead = POINTER_SHIFT(pSubmit, sizeof(SSubmitReq));
blkHead->numOfRows = htonl(pDataBlock->info.rows); blkHead->numOfRows = htonl(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version); blkHead->sversion = htonl(pTSchema->version);
...@@ -531,17 +530,16 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -531,17 +530,16 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
} }
blkHead->dataLen = htonl(dataLen); blkHead->dataLen = htonl(dataLen);
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen; pSubmit->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
ret->length = htonl(ret->length); pSubmit->length = htonl(pSubmit->length);
SRpcMsg msg = { SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT, .msgType = TDMT_VND_SUBMIT,
.pCont = ret, .pCont = pSubmit,
.contLen = ntohl(ret->length), .contLen = ntohl(pSubmit->length),
}; };
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(ret);
tqDebug("failed to put into write-queue since %s", terrstr()); tqDebug("failed to put into write-queue since %s", terrstr());
} }
} }
......
...@@ -239,6 +239,7 @@ typedef struct SSourceDataInfo { ...@@ -239,6 +239,7 @@ typedef struct SSourceDataInfo {
int32_t index; int32_t index;
SRetrieveTableRsp* pRsp; SRetrieveTableRsp* pRsp;
uint64_t totalRows; uint64_t totalRows;
int64_t startTime;
int32_t code; int32_t code;
EX_SOURCE_STATUS status; EX_SOURCE_STATUS status;
const char* taskId; const char* taskId;
......
...@@ -44,7 +44,7 @@ typedef struct SFetchRspHandleWrapper { ...@@ -44,7 +44,7 @@ typedef struct SFetchRspHandleWrapper {
static void destroyExchangeOperatorInfo(void* param); static void destroyExchangeOperatorInfo(void* param);
static void freeBlock(void* pParam); static void freeBlock(void* pParam);
static void freeSourceDataInfo(void* param); static void freeSourceDataInfo(void* param);
static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs); static void* setAllSourcesCompleted(SOperatorInfo* pOperator);
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code); static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex); static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
...@@ -59,7 +59,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -59,7 +59,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
int32_t completed = getCompletedSources(pExchangeInfo->pSourceDataInfo); int32_t completed = getCompletedSources(pExchangeInfo->pSourceDataInfo);
if (completed == totalSources) { if (completed == totalSources) {
setAllSourcesCompleted(pOperator, pExchangeInfo->openedTs); setAllSourcesCompleted(pOperator);
return; return;
} }
...@@ -113,7 +113,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -113,7 +113,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
taosArrayPush(pExchangeInfo->pResultBlockList, &pb); taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
} }
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pExchangeInfo->openedTs, pOperator); updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
pDataInfo->totalRows += pRetrieveRsp->numOfRows;
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
...@@ -388,6 +389,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas ...@@ -388,6 +389,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex); SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
pDataInfo->startTime = taosGetTimestampUs();
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY); ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
...@@ -508,18 +510,14 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo ...@@ -508,18 +510,14 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { void* setAllSourcesCompleted(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int64_t el = taosGetTimestampUs() - startTs;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
pLoadInfo->totalElapsed += el;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms", qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize, GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
pLoadInfo->totalElapsed / 1000.0); pLoadInfo->totalElapsed / 1000.0);
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
...@@ -581,7 +579,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -581,7 +579,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
while (1) { while (1) {
if (pExchangeInfo->current >= totalSources) { if (pExchangeInfo->current >= totalSources) {
setAllSourcesCompleted(pOperator, startTs); setAllSourcesCompleted(pOperator);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -690,6 +690,8 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { ...@@ -690,6 +690,8 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator); SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator);
if (pBlock != NULL) { if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
...@@ -754,7 +756,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -754,7 +756,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
pInfo->groupSort = pMergePhyNode->groupSort; pInfo->groupSort = pMergePhyNode->groupSort;
......
...@@ -24,6 +24,8 @@ static double tlog2(double v, double base) { ...@@ -24,6 +24,8 @@ static double tlog2(double v, double base) {
return a; return a;
} else if (isnan(b) || isinf(b)) { } else if (isnan(b) || isinf(b)) {
return b; return b;
} else if (b == 0) {
return INFINITY;
} else { } else {
return a / b; return a / b;
} }
......
...@@ -411,6 +411,7 @@ int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) { ...@@ -411,6 +411,7 @@ int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
streamStatePut(pState, &tmp, NULL, 0); streamStatePut(pState, &tmp, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp); SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0); int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
streamStateFreeCur(pCur);
streamStateDel(pState, &tmp); streamStateDel(pState, &tmp);
return code; return code;
} }
......
此差异已折叠。
...@@ -21,7 +21,14 @@ LOG_DIR=$TAOS_DIR/sim/asan ...@@ -21,7 +21,14 @@ LOG_DIR=$TAOS_DIR/sim/asan
error_num=`cat ${LOG_DIR}/*.asan | grep "ERROR" | wc -l` error_num=`cat ${LOG_DIR}/*.asan | grep "ERROR" | wc -l`
memory_leak=`cat ${LOG_DIR}/*.asan | grep "Direct leak" | wc -l` memory_leak=`cat ${LOG_DIR}/*.asan | grep "Direct leak" | wc -l`
indirect_leak=`cat ${LOG_DIR}/*.asan | grep "Indirect leak" | wc -l` indirect_leak=`cat ${LOG_DIR}/*.asan | grep "Indirect leak" | wc -l`
runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | wc -l`
# ignore
# /root/TDengine/source/libs/scalar/src/sclfunc.c:735:11: runtime error: 4.75783e+11 is outside the range of representable values of type 'signed char'
# /root/TDengine/source/libs/scalar/src/sclfunc.c:790:11: runtime error: 3.4e+38 is outside the range of representable values of type 'long int'
# /root/TDengine/source/libs/scalar/src/sclfunc.c:772:11: runtime error: 3.52344e+09 is outside the range of representable values of type 'int'
# /root/TDengine/source/libs/scalar/src/sclfunc.c:753:11: runtime error: 4.75783e+11 is outside the range of representable values of type 'short int'
runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type" | wc -l`
python_error=`cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l` python_error=`cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l`
echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m" echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m"
......
...@@ -77,13 +77,14 @@ echo "Preload AsanSo:" $? ...@@ -77,13 +77,14 @@ echo "Preload AsanSo:" $?
$* -a 2> $AsanFile $* -a 2> $AsanFile
unset LD_PRELOAD unset LD_PRELOAD
AsanFileLen=`cat $AsanFile | wc -l` for ((i=1;i<=20;i++))
while [ $AsanFileLen -lt 10 ]
do do
sleep 1 AsanFileLen=`cat $AsanFile | wc -l`
`cat $AsanFile | wc -l` echo "AsanFileLen:" $AsanFileLen
if [ $AsanFileLen -gt 10 ]; then
break
fi
done done
echo "AsanFileLen:" $AsanFileLen
AsanFileSuccessLen=`grep -w successfully $AsanFile | wc -l` AsanFileSuccessLen=`grep -w successfully $AsanFile | wc -l`
echo "AsanFileSuccessLen:" $AsanFileSuccessLen echo "AsanFileSuccessLen:" $AsanFileSuccessLen
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册