未验证 提交 930613db 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #14235 from taosdata/feature/stream

fix(stream): stream rsp to retrieve msg if data is empty
......@@ -47,6 +47,7 @@ typedef enum EStreamType {
STREAM_GET_ALL,
STREAM_DELETE,
STREAM_RETRIEVE,
STREAM_PUSH_DATA,
} EStreamType;
typedef struct {
......
......@@ -58,6 +58,7 @@ enum {
enum {
STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__TRIGGER,
STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__DROP,
......
......@@ -111,7 +111,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
// enqueue
if (pData != NULL) {
pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
pData->type = STREAM_INPUT__DATA_BLOCK;
pData->srcVgId = pReq->dataSrcVgId;
// decode
/*pData->blocks = pReq->data;*/
......@@ -146,7 +146,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
// enqueue
if (pData != NULL) {
pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
pData->type = STREAM_INPUT__DATA_RETRIEVE;
pData->srcVgId = 0;
// decode
/*pData->blocks = pReq->data;*/
......@@ -170,7 +170,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
pCont->rspToTaskId = pReq->srcTaskId;
pCont->rspFromTaskId = pReq->dstTaskId;
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp);
tmsgSendRsp(pRsp);
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}
......
......@@ -300,7 +300,7 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return 0;
}
ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK);
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
qInfo("stream continue dispatching: task %d", pTask->taskId);
......
......@@ -17,6 +17,7 @@
static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
void* exec = pTask->exec.executor;
bool hasData = false;
// set input
SStreamQueueItem* pItem = (SStreamQueueItem*)data;
......@@ -27,7 +28,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
ASSERT(pTask->isDataScan);
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK) {
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
SArray* blocks = pBlock->blocks;
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false);
......@@ -43,7 +44,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false);
}
if (output == NULL) break;
if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE && !hasData) {
SSDataBlock block = {0};
block.info.type = STREAM_PUSH_DATA;
block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block);
}
break;
}
hasData = true;
if (output->info.type == STREAM_RETRIEVE) {
if (streamBroadcastToChildren(pTask, output) < 0) {
......
......@@ -42,6 +42,9 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
}
}
walRemoveMeta(pWal);
......@@ -105,7 +108,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
}
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pIdxTFile == NULL) {
taosThreadMutexUnlock(&pWal->mutex);
......@@ -126,7 +129,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
ASSERT(entry.ver == ver);
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ);
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pLogTFile == NULL) {
// TODO
taosThreadMutexUnlock(&pWal->mutex);
......@@ -307,8 +310,8 @@ int walRoll(SWal *pWal) {
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset};
/*int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_CUR);*/
/*wDebug("write index: ver: %ld, offset: %ld, at %ld", ver, offset, idxOffset);*/
int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END);
wDebug("write index: ver: %ld, offset: %ld, at %ld", ver, offset, idxOffset);
int64_t size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry));
if (size != sizeof(SWalIdxEntry)) {
terrno = TAOS_SYSTEM_ERROR(errno);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册