未验证 提交 7a5be43d 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17954 from taosdata/feature/stream

fix(stream): delete tb should be checked in write thread
...@@ -3134,7 +3134,8 @@ int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); ...@@ -3134,7 +3134,8 @@ int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes); int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
typedef struct { typedef struct {
int64_t uid; // int64_t uid;
char tbname[TSDB_TABLE_NAME_LEN];
int64_t ts; int64_t ts;
} SSingleDeleteReq; } SSingleDeleteReq;
......
...@@ -317,6 +317,8 @@ typedef struct SStreamTask { ...@@ -317,6 +317,8 @@ typedef struct SStreamTask {
int8_t inputStatus; int8_t inputStatus;
int8_t outputStatus; int8_t outputStatus;
// STaosQueue* inputQueue1;
// STaosQall* inputQall;
SStreamQueue* inputQueue; SStreamQueue* inputQueue;
SStreamQueue* outputQueue; SStreamQueue* outputQueue;
......
...@@ -1649,7 +1649,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1649,7 +1649,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} else { } else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->dataRsp.head.epoch, consumerEpoch); pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
...@@ -1667,7 +1667,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1667,7 +1667,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} else { } else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->metaRsp.head.epoch, consumerEpoch); pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
......
...@@ -6131,13 +6131,13 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) { ...@@ -6131,13 +6131,13 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) {
} }
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
if (tEncodeI64(pEncoder, pReq->uid) < 0) return -1; if (tEncodeCStr(pEncoder, pReq->tbname) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1; if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1;
return 0; return 0;
} }
int32_t tDecodeSSingleDeleteReq(SDecoder *pDecoder, SSingleDeleteReq *pReq) { int32_t tDecodeSSingleDeleteReq(SDecoder *pDecoder, SSingleDeleteReq *pReq) {
if (tDecodeI64(pDecoder, &pReq->uid) < 0) return -1; if (tDecodeCStrTo(pDecoder, pReq->tbname) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1;
return 0; return 0;
} }
......
...@@ -25,6 +25,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -25,6 +25,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
tqDebug("stream delete msg: row %d", totRow);
for (int32_t row = 0; row < totRow; row++) { for (int32_t row = 0; row < totRow; row++) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row); int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
...@@ -36,11 +38,14 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -36,11 +38,14 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
} else { } else {
name = buildCtbNameByGroupId(stbFullName, groupId); name = buildCtbNameByGroupId(stbFullName, groupId);
} }
tqDebug("stream delete msg: groupId :%" PRId64 ", name: %s", groupId, name); tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, ts:%" PRId64, pVnode->config.vgId, groupId,
name, ts);
#if 0
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0); metaReaderInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, name) < 0) { if (metaGetTableEntryByName(&mr, name) < 0) {
metaReaderClear(&mr); metaReaderClear(&mr);
tqDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name);
taosMemoryFree(name); taosMemoryFree(name);
continue; continue;
} }
...@@ -48,10 +53,13 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -48,10 +53,13 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
int64_t uid = mr.me.uid; int64_t uid = mr.me.uid;
metaReaderClear(&mr); metaReaderClear(&mr);
taosMemoryFree(name); taosMemoryFree(name);
#endif
SSingleDeleteReq req = { SSingleDeleteReq req = {
.ts = ts, .ts = ts,
.uid = uid,
}; };
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
taosMemoryFree(name);
/*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/
taosArrayPush(deleteReq->deleteReqs, &req); taosArrayPush(deleteReq->deleteReqs, &req);
} }
return 0; return 0;
...@@ -309,6 +317,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -309,6 +317,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
deleteReq.suid = suid; deleteReq.suid = suid;
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq); tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq);
if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
taosArrayDestroy(deleteReq.deleteReqs);
continue;
}
int32_t len; int32_t len;
int32_t code; int32_t code;
......
...@@ -117,8 +117,8 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI ...@@ -117,8 +117,8 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info); metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info);
} }
if (pMsgIter->sversion != info.skmVer) { if (pMsgIter->sversion != info.skmVer) {
tsdbError("vgId:%d, req sver:%d, skmVer:%d suid:%" PRId64 " uid:%" PRId64, tsdbError("vgId:%d, req sver:%d, skmVer:%d suid:%" PRId64 " uid:%" PRId64, TD_VID(pTsdb->pVnode),
TD_VID(pTsdb->pVnode), pMsgIter->sversion, info.skmVer, suid, uid); pMsgIter->sversion, info.skmVer, suid, uid);
code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER; code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
goto _err; goto _err;
} }
...@@ -198,14 +198,14 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid ...@@ -198,14 +198,14 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
} }
tsdbInfo("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 tsdbInfo("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
" since %s", " at version %" PRId64 " since %s",
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code)); TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code));
return code; return code;
_err: _err:
tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
" since %s", " at version %" PRId64 " since %s",
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code)); TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code));
return code; return code;
} }
......
...@@ -1169,16 +1169,28 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void ...@@ -1169,16 +1169,28 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void
tDecoderInit(&decoder, pReq, len); tDecoderInit(&decoder, pReq, len);
tDecodeSBatchDeleteReq(&decoder, &deleteReq); tDecodeSBatchDeleteReq(&decoder, &deleteReq);
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
int32_t sz = taosArrayGetSize(deleteReq.deleteReqs); int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i); SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i);
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, pOneReq->uid, pOneReq->ts, pOneReq->ts); char *name = pOneReq->tbname;
if (metaGetTableEntryByName(&mr, name) < 0) {
vDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name);
continue;
}
int64_t uid = mr.me.uid;
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->ts, pOneReq->ts);
if (code < 0) { if (code < 0) {
terrno = code; terrno = code;
vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64, vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
TD_VID(pVnode), terrstr(), deleteReq.suid, pOneReq->uid, pOneReq->ts, pOneReq->ts); TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->ts, pOneReq->ts);
} }
} }
metaReaderClear(&mr);
taosArrayDestroy(deleteReq.deleteReqs); taosArrayDestroy(deleteReq.deleteReqs);
return 0; return 0;
} }
......
...@@ -184,7 +184,6 @@ enum { ...@@ -184,7 +184,6 @@ enum {
typedef struct SOperatorFpSet { typedef struct SOperatorFpSet {
__optr_open_fn_t _openFn; // DO NOT invoke this function directly __optr_open_fn_t _openFn; // DO NOT invoke this function directly
__optr_fn_t getNextFn; __optr_fn_t getNextFn;
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model, todo remove it
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn; __optr_close_fn_t closeFn;
__optr_encode_fn_t encodeResultRow; __optr_encode_fn_t encodeResultRow;
......
...@@ -114,7 +114,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, ...@@ -114,7 +114,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
SOperatorFpSet fpSet = { SOperatorFpSet fpSet = {
._openFn = openFn, ._openFn = openFn,
.getNextFn = nextFn, .getNextFn = nextFn,
.getStreamResFn = streamFn,
.cleanupFn = cleanup, .cleanupFn = cleanup,
.closeFn = closeFn, .closeFn = closeFn,
.getExplainFn = explain, .getExplainFn = explain,
...@@ -1984,7 +1983,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -1984,7 +1983,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
} }
} }
_error: _error:
pTaskInfo->code = code; pTaskInfo->code = code;
} }
...@@ -2260,7 +2259,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode ...@@ -2260,7 +2259,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL); createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL);
return pOperator; return pOperator;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
doDestroyExchangeOperatorInfo(pInfo); doDestroyExchangeOperatorInfo(pInfo);
} }
...@@ -3428,7 +3427,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3428,7 +3427,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
qDebug("add table uid:%" PRIu64", gid:%"PRIu64, pKeyInfo->uid, pKeyInfo->groupId); qDebug("add table uid:%" PRIu64 ", gid:%" PRIu64, pKeyInfo->uid, pKeyInfo->groupId);
} }
#endif #endif
} }
...@@ -3461,7 +3460,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3461,7 +3460,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
for(int32_t i = 0; i < tableListGetSize(pTableListInfo); ++i) { for (int32_t i = 0; i < tableListGetSize(pTableListInfo); ++i) {
STableKeyInfo* p = taosArrayGet(pList, i); STableKeyInfo* p = taosArrayGet(pList, i);
tableListAddTableInfo(pTableListInfo, p->uid, 0); tableListAddTableInfo(pTableListInfo, p->uid, 0);
} }
...@@ -3712,7 +3711,7 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32 ...@@ -3712,7 +3711,7 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32
*length = *(int32_t*)(*result); *length = *(int32_t*)(*result);
} }
_downstream: _downstream:
for (int32_t i = 0; i < ops->numOfDownstream; ++i) { for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal); code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
if (code != TDB_CODE_SUCCESS) { if (code != TDB_CODE_SUCCESS) {
......
...@@ -1691,14 +1691,14 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi ...@@ -1691,14 +1691,14 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
pInfo->srcRowIndex = 0; pInfo->srcRowIndex = 0;
pOperator->name = "FillOperator"; pOperator->name = "StreamFillOperator";
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, NULL, destroyStreamFillOperatorInfo, pOperator->fpSet =
NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, NULL, destroyStreamFillOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -12,8 +12,8 @@ ...@@ -12,8 +12,8 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "filter.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "filter.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
#include "tcommon.h" #include "tcommon.h"
...@@ -986,7 +986,7 @@ void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInf ...@@ -986,7 +986,7 @@ void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInf
// current result is done in computing final results. // current result is done in computing final results.
if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
closeResultRow(pResult); closeResultRow(pResult);
SListNode *pNode = tdListPopHead(pResultRowInfo->openWindow); SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
taosMemoryFree(pNode); taosMemoryFree(pNode);
} }
} }
...@@ -1255,9 +1255,8 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { ...@@ -1255,9 +1255,8 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = pInfo->binfo.pRes; SSDataBlock* pBlock = pInfo->binfo.pRes;
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { ASSERT(pInfo->execModel == OPTR_EXEC_MODEL_BATCH);
return pOperator->fpSet.getStreamResFn(pOperator);
} else {
pTaskInfo->code = pOperator->fpSet._openFn(pOperator); pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) { if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
...@@ -1283,7 +1282,6 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { ...@@ -1283,7 +1282,6 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += rows; pOperator->resultInfo.totalRows += rows;
return (rows == 0) ? NULL : pBlock; return (rows == 0) ? NULL : pBlock;
}
} }
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) { static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
......
...@@ -32,8 +32,6 @@ typedef struct { ...@@ -32,8 +32,6 @@ typedef struct {
static SStreamGlobalEnv streamEnv; static SStreamGlobalEnv streamEnv;
// int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch);
int32_t streamDispatch(SStreamTask* pTask); int32_t streamDispatch(SStreamTask* pTask);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
......
...@@ -240,43 +240,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { ...@@ -240,43 +240,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
return 0; return 0;
} }
#if 0
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
SStreamTaskRecoverRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->inputStatus = pTask->inputStatus;
pCont->streamId = pTask->streamId;
pCont->reqTaskId = pTask->taskId;
pCont->rspTaskId = pReq->upstreamTaskId;
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp);
tmsgSendRsp(pRsp);
return 0;
}
int32_t streamProcessRecoverRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) {
streamProcessRunReq(pTask);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
// scan data to recover
pTask->inputStatus = TASK_INPUT_STATUS__RECOVER;
pTask->taskStatus = TASK_STATUS__RECOVER_SELF;
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
if (streamPipelineExec(pTask, 100, true) < 0) {
return -1;
}
} else {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->taskStatus = TASK_STATUS__NORMAL;
}
return 0;
}
#endif
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId); qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
......
...@@ -138,63 +138,39 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -138,63 +138,39 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
} }
#if 0 #if 0
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) { int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); // fetch all queue item, merge according to batchLimit
int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
void* exec = pTask->exec.executor; if (numOfItems == 0) {
qDebug("task: %d, stream task exec over, queue empty", pTask->taskId);
while (1) { return 0;
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t batchCnt = 0;
while (1) {
SSDataBlock* output = NULL;
uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(0);
}
if (output == NULL) break;
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block);
if (++batchCnt >= batchNum) break;
} }
if (taosArrayGetSize(pRes) == 0) { SStreamQueueItem* pMerged = NULL;
taosArrayDestroy(pRes); SStreamQueueItem* pItem = NULL;
break; taosGetQitem(pTask->inputQall, (void**)&pItem);
if (pItem == NULL) {
if (pMerged != NULL) {
// process merged item
} else {
return 0;
} }
if (dispatch) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return -1;
} }
qRes->type = STREAM_INPUT__DATA_BLOCK; // if drop
qRes->blocks = pRes; if (pItem->type == STREAM_INPUT__DESTROY) {
qRes->childId = pTask->selfChildId; // set status drop
if (streamTaskOutput(pTask, qRes) < 0) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return -1; return -1;
} }
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->taskLevel == TASK_LEVEL__SINK) {
streamDispatch(pTask); ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
} streamTaskOutput(pTask, (SStreamDataBlock*)pItem);
} else {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
}
} }
// exec impl
// output
// try dispatch
return 0; return 0;
} }
#endif #endif
......
...@@ -482,7 +482,8 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy ...@@ -482,7 +482,8 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
wDebug("vgId:%d, wal write log %" PRId64 ", msgType: %s", pWal->cfg.vgId, index, TMSG_INFO(msgType)); wDebug("vgId:%d, wal write log %" PRId64 ", msgType: %s, cksum head %u cksum body %u", pWal->cfg.vgId, index,
TMSG_INFO(msgType), pWal->writeHead.cksumHead, pWal->writeHead.cksumBody);
code = walWriteIndex(pWal, index, offset); code = walWriteIndex(pWal, index, offset);
if (code < 0) { if (code < 0) {
......
...@@ -208,7 +208,7 @@ STaosQall *taosAllocateQall() { ...@@ -208,7 +208,7 @@ STaosQall *taosAllocateQall() {
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); } void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
int32_t code = 0; int32_t numOfItems = 0;
bool empty; bool empty;
taosThreadMutexLock(&queue->mutex); taosThreadMutexLock(&queue->mutex);
...@@ -219,13 +219,14 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { ...@@ -219,13 +219,14 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
qall->current = queue->head; qall->current = queue->head;
qall->start = queue->head; qall->start = queue->head;
qall->numOfItems = queue->numOfItems; qall->numOfItems = queue->numOfItems;
code = qall->numOfItems; numOfItems = qall->numOfItems;
queue->head = NULL; queue->head = NULL;
queue->tail = NULL; queue->tail = NULL;
queue->numOfItems = 0; queue->numOfItems = 0;
queue->memOfItems = 0; queue->memOfItems = 0;
uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems); uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, numOfItems, queue, queue->numOfItems,
queue->memOfItems);
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
} }
...@@ -237,7 +238,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { ...@@ -237,7 +238,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
qall->start = NULL; qall->start = NULL;
qall->numOfItems = 0; qall->numOfItems = 0;
} }
return code; return numOfItems;
} }
int32_t taosGetQitem(STaosQall *qall, void **ppItem) { int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册