提交 8fdb4d48 编写于 作者: P plum-lihui

Merge branch '3.0' into test3.0/lihui

......@@ -54,12 +54,12 @@ enum {
enum {
STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__MERGED_SUBMIT,
// STREAM_INPUT__TABLE_SCAN,
STREAM_INPUT__TQ_SCAN,
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__DROP,
};
typedef enum EStreamType {
......
......@@ -77,6 +77,13 @@ typedef struct {
SSubmitReq* data;
} SStreamDataSubmit;
typedef struct {
int8_t type;
int64_t ver;
SArray* dataRefs; // SArray<int32_t*>
SArray* reqs; // SArray<SSubmitReq*>
} SStreamMergedSubmit;
typedef struct {
int8_t type;
......
......@@ -87,6 +87,14 @@ void taosArrayRemoveBatch(SArray* pArray, const int32_t* pData, int32_t numOfEle
*/
void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*));
/**
*
* @param pArray
* @param comparFn
* @param fp
*/
void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*));
/**
* add all element from the source array list into the destination
* @param pArray
......
......@@ -452,7 +452,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t code = -1;
SArray *newSub = subscribe.topicNames;
taosArraySortString(newSub, taosArrayCompareString);
taosArrayRemoveDuplicate(newSub, taosArrayCompareString, taosMemoryFree);
taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree);
int32_t newTopicNum = taosArrayGetSize(newSub);
// check topic existance
......
......@@ -91,7 +91,7 @@ int metaBegin(SMeta* pMeta);
int metaCommit(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
......
......@@ -212,7 +212,7 @@ _err:
return -1;
}
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tbUidList) {
void *pKey = NULL;
int nKey = 0;
void *pData = NULL;
......@@ -228,8 +228,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
}
// drop all child tables
TBC *pCtbIdxc = NULL;
SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t));
TBC *pCtbIdxc = NULL;
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
......@@ -249,20 +248,18 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
break;
}
taosArrayPush(pArray, &(((SCtbIdxKey *)pKey)->uid));
taosArrayPush(tbUidList, &(((SCtbIdxKey *)pKey)->uid));
}
tdbTbcClose(pCtbIdxc);
metaWLock(pMeta);
for (int32_t iChild = 0; iChild < taosArrayGetSize(pArray); iChild++) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(pArray, iChild);
for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUidList, iChild);
metaDropTableByUid(pMeta, uid, NULL);
}
taosArrayDestroy(pArray);
// drop super table
_drop_super_table:
tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData);
......
......@@ -49,8 +49,8 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerI
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
// TODO add reference to gurantee success
if (metaGetTableEntryByUid(&mr, uid) < 0) {
ASSERT(0);
return -1;
}
char* tbName = strdup(mr.me.name);
......@@ -87,16 +87,18 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
tqDebug("task execute end, get %p", pDataBlock);
if (pDataBlock != NULL) {
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
pRsp->blockNum++;
if (pRsp->withTbName) {
if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader[0]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue;
}
} else {
pRsp->withTbName = 0;
}
}
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
pRsp->blockNum++;
if (pOffset->type == TMQ_OFFSET__LOG) {
continue;
} else {
......@@ -193,13 +195,14 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block, pReader) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
ASSERT(0);
}
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue;
}
}
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
pRsp->blockNum++;
}
......@@ -211,13 +214,14 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block, pReader) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
ASSERT(0);
}
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue;
}
}
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
pRsp->blockNum++;
}
......
......@@ -299,6 +299,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
}
if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
......
......@@ -178,6 +178,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pRes = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
tqDebug("task write into table, vgId %d, block num: %d", pVnode->config.vgId, (int32_t)pRes->size);
ASSERT(pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId);
......
......@@ -557,6 +557,7 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
SVDropStbReq req = {0};
int32_t rcode = TSDB_CODE_SUCCESS;
SDecoder decoder = {0};
SArray *tbUidList = NULL;
pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
pRsp->pCont = NULL;
......@@ -570,7 +571,14 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
}
// process request
if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
tbUidList = taosArrayInit(8, sizeof(int64_t));
if (tbUidList == NULL) goto _exit;
if (metaDropSTable(pVnode->pMeta, version, &req, tbUidList) < 0) {
rcode = terrno;
goto _exit;
}
if (tqUpdateTbUidList(pVnode->pTq, tbUidList, false) < 0) {
rcode = terrno;
goto _exit;
}
......@@ -582,6 +590,7 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
// return rsp
_exit:
if (tbUidList) taosArrayDestroy(tbUidList);
pRsp->code = rcode;
tDecoderClear(&decoder);
return 0;
......
......@@ -42,17 +42,32 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
// TODO: if a block was set but not consumed,
// prevent setting a different type of block
pInfo->blockType = type;
if (type == STREAM_INPUT__DATA_SUBMIT) {
if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
pInfo->validBlockIndex = 0;
taosArrayClear(pInfo->pBlockLists);
if (type == STREAM_INPUT__MERGED_SUBMIT) {
ASSERT(numOfBlocks > 1);
for (int32_t i = 0; i < numOfBlocks; i++) {
SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
taosArrayPush(pInfo->pBlockLists, &pReq);
}
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
/*if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) {*/
/*qError("submit msg messed up when initing stream block, %s" PRIx64, id);*/
/*return TSDB_CODE_QRY_APP_ERROR;*/
/*}*/
ASSERT(numOfBlocks == 1);
/*if (numOfBlocks == 1) {*/
taosArrayPush(pInfo->pBlockLists, &input);
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
/*} else {*/
/*}*/
} else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
// TODO optimize
SSDataBlock* p = createOneDataBlock(pDataBlock, false);
p->info = pDataBlock->info;
......@@ -60,6 +75,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
taosArrayPush(pInfo->pBlockLists, &p);
}
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else {
ASSERT(0);
}
......@@ -167,7 +183,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
return pTaskInfo;
}
static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) {
static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList,
const char* idstr) {
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
// let's discard the tables those are not created according to the queried super table.
......
......@@ -1392,24 +1392,49 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->tqReader)) {
SSDataBlock block = {0};
int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
// todo refactor
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);
while (1) {
if (pInfo->tqReader->pMsg == NULL) {
if (pInfo->validBlockIndex >= totBlockNum) {
return NULL;
}
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
pTaskInfo->code = code;
return NULL;
int32_t current = pInfo->validBlockIndex++;
SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current);
if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
totBlockNum);
pInfo->tqReader->pMsg = NULL;
continue;
}
}
setBlockIntoRes(pInfo, &block);
blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->tqReader)) {
SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue;
}
setBlockIntoRes(pInfo, &block);
if (pBlockInfo->rows > 0) {
break;
}
}
if (pBlockInfo->rows > 0) {
break;
} else {
pInfo->tqReader->pMsg = NULL;
continue;
}
/*blockDataCleanup(pInfo->pRes);*/
}
// record the scan action.
......@@ -2557,30 +2582,30 @@ typedef struct STableMergeScanInfo {
SArray* pSortInfo;
SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SFileBlockLoadRecorder readRecorder;
int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowEntryInfoOffset;
SExprInfo* pExpr;
SSDataBlock* pResBlock;
SArray* pColMatchInfo;
int32_t numOfOutput;
int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowEntryInfoOffset;
SExprInfo* pExpr;
SSDataBlock* pResBlock;
SArray* pColMatchInfo;
int32_t numOfOutput;
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
SqlFunctionCtx* pPseudoCtx;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SInterval interval;
......@@ -2588,7 +2613,8 @@ typedef struct STableMergeScanInfo {
} STableMergeScanInfo;
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* idStr) {
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idStr) {
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
......
......@@ -44,7 +44,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
void streamFreeQitem(SStreamQueueItem* data);
#ifdef __cplusplus
......
......@@ -77,6 +77,28 @@ FAIL:
return NULL;
}
SStreamMergedSubmit* streamMergedSubmitNew() {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM);
if (pMerged == NULL) return NULL;
pMerged->reqs = taosArrayInit(0, sizeof(void*));
pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL;
pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
return pMerged;
FAIL:
if (pMerged->reqs) taosArrayDestroy(pMerged->reqs);
if (pMerged->dataRefs) taosArrayDestroy(pMerged->dataRefs);
taosFreeQitem(pMerged);
return NULL;
}
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
taosArrayPush(pMerged->reqs, &pSubmit->data);
pMerged->ver = pSubmit->ver;
return 0;
}
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
atomic_add_fetch_32(pDataSubmit->dataRef, 1);
}
......@@ -100,15 +122,31 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
}
}
int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
ASSERT(elem);
if (dst->type == elem->type && dst->type == STREAM_INPUT__DATA_BLOCK) {
if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)elem;
taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
return 0;
taosArrayDestroy(pBlockSrc->blocks);
taosFreeQitem(elem);
return dst;
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)elem;
streamMergeSubmit(pMerged, pBlockSrc);
taosFreeQitem(elem);
return dst;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
ASSERT(pMerged);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)elem);
taosFreeQitem(dst);
taosFreeQitem(elem);
return (SStreamQueueItem*)pMerged;
} else {
return -1;
return NULL;
}
}
......@@ -123,5 +161,20 @@ void streamFreeQitem(SStreamQueueItem* data) {
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
int32_t sz = taosArrayGetSize(pMerge->reqs);
for (int32_t i = 0; i < sz; i++) {
int32_t* ref = taosArrayGetP(pMerge->dataRefs, i);
(*ref)--;
if (*ref == 0) {
void* data = taosArrayGetP(pMerge->reqs, i);
taosMemoryFree(data);
taosMemoryFree(ref);
}
}
taosArrayDestroy(pMerge->reqs);
taosArrayDestroy(pMerge->dataRefs);
taosFreeQitem(pMerge);
}
}
......@@ -33,9 +33,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
SArray* blocks = pBlock->blocks;
qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DROP) {
// TODO exec drop
return 0;
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
SArray* blocks = pMerged->reqs;
qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT, false);
} else {
ASSERT(0);
}
// exec
......@@ -144,7 +148,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
while (1) {
int32_t cnt = 0;
int32_t cnt = 1;
void* data = NULL;
while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
......@@ -155,24 +159,23 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if (data == NULL) {
data = qItem;
streamQueueProcessSuccess(pTask->inputQueue);
if (qItem->type == STREAM_INPUT__DATA_BLOCK) {
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
} else {
break;
}
/*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
/*}*/
} else {
if (streamAppendQueueItem(data, qItem) < 0) {
void* newRet;
if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) {
streamQueueProcessFail(pTask->inputQueue);
break;
} else {
cnt++;
data = newRet;
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
streamQueueProcessSuccess(pTask->inputQueue);
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
taosFreeQitem(qItem);
}
}
}
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
if (data) streamFreeQitem(data);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
......@@ -194,6 +197,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) {
// TODO log failed ver
streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes);
return NULL;
......@@ -201,6 +205,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
if (streamTaskOutput(pTask, qRes) < 0) {
// TODO log failed ver
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
......
......@@ -306,6 +306,8 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
break;
}
syncNodeRestartHeartbeatTimer(pSyncNode);
return ret;
}
......
......@@ -417,7 +417,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
}
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
wError("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
......@@ -425,7 +425,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
if (pRead->curInvalid || pRead->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
return -1;
}
seeked = true;
......@@ -452,7 +452,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
contLen = walValidHeadCksum(pRead->pHead);
if (contLen != 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId,
ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
......@@ -479,7 +480,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
}
if (pRead->pHead->head.version != ver) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pRead->pHead->head.version, ver);
pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
......@@ -489,7 +490,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
contLen = walValidBodyCksum(pRead->pHead);
if (contLen != 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
ver);
pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
......
......@@ -173,6 +173,46 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
pArray->size = pos + 1;
}
void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) {
assert(pArray);
size_t size = pArray->size;
if (size <= 1) {
return;
}
int32_t pos = 0;
for (int32_t i = 1; i < size; ++i) {
char* p1 = taosArrayGet(pArray, pos);
char* p2 = taosArrayGet(pArray, i);
if (comparFn(p1, p2) == 0) {
// do nothing
} else {
if (pos + 1 != i) {
void* p = taosArrayGet(pArray, pos + 1);
if (fp != NULL) {
fp(p);
}
taosArraySet(pArray, pos + 1, p2);
pos += 1;
} else {
pos += 1;
}
}
}
if (fp != NULL) {
for (int32_t i = pos + 1; i < pArray->size; ++i) {
void* p = taosArrayGetP(pArray, i);
fp(p);
}
}
pArray->size = pos + 1;
}
void* taosArrayAddAll(SArray* pArray, const SArray* pInput) {
if (pInput) {
return taosArrayAddBatch(pArray, pInput->pData, (int32_t)taosArrayGetSize(pInput));
......
......@@ -88,7 +88,7 @@ class TDTestCase:
tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName)
tdLog.info("After waiting for a period of time, drop one stable")
time.sleep(10)
time.sleep(3)
tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName']))
tdLog.info("wait result from consumer, then check it")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册