提交 81ed902d 编写于 作者: L Liu Jicong

fix(stream): stream rsp to retrieve msg if data is empty

上级 5040cb95
...@@ -47,6 +47,7 @@ typedef enum EStreamType { ...@@ -47,6 +47,7 @@ typedef enum EStreamType {
STREAM_GET_ALL, STREAM_GET_ALL,
STREAM_DELETE, STREAM_DELETE,
STREAM_RETRIEVE, STREAM_RETRIEVE,
STREAM_PUSH_DATA,
} EStreamType; } EStreamType;
typedef struct { typedef struct {
...@@ -71,7 +72,7 @@ typedef struct SColumnDataAgg { ...@@ -71,7 +72,7 @@ typedef struct SColumnDataAgg {
typedef struct SDataBlockInfo { typedef struct SDataBlockInfo {
STimeWindow window; STimeWindow window;
int32_t rows; // todo hide this attribute int32_t rows; // todo hide this attribute
int32_t rowSize; int32_t rowSize;
uint64_t uid; // the uid of table, from which current data block comes uint64_t uid; // the uid of table, from which current data block comes
uint16_t blockId; // block id, generated by physical planner uint16_t blockId; // block id, generated by physical planner
......
...@@ -58,6 +58,7 @@ enum { ...@@ -58,6 +58,7 @@ enum {
enum { enum {
STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK, STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__TRIGGER, STREAM_INPUT__TRIGGER,
STREAM_INPUT__CHECKPOINT, STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__DROP, STREAM_INPUT__DROP,
......
...@@ -111,7 +111,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* ...@@ -111,7 +111,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
// enqueue // enqueue
if (pData != NULL) { if (pData != NULL) {
pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK; pData->type = STREAM_INPUT__DATA_BLOCK;
pData->srcVgId = pReq->dataSrcVgId; pData->srcVgId = pReq->dataSrcVgId;
// decode // decode
/*pData->blocks = pReq->data;*/ /*pData->blocks = pReq->data;*/
...@@ -146,7 +146,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, ...@@ -146,7 +146,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
// enqueue // enqueue
if (pData != NULL) { if (pData != NULL) {
pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK; pData->type = STREAM_INPUT__DATA_RETRIEVE;
pData->srcVgId = 0; pData->srcVgId = 0;
// decode // decode
/*pData->blocks = pReq->data;*/ /*pData->blocks = pReq->data;*/
...@@ -170,7 +170,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, ...@@ -170,7 +170,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
pCont->rspToTaskId = pReq->srcTaskId; pCont->rspToTaskId = pReq->srcTaskId;
pCont->rspFromTaskId = pReq->dstTaskId; pCont->rspFromTaskId = pReq->dstTaskId;
pRsp->pCont = buf; pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp);
tmsgSendRsp(pRsp); tmsgSendRsp(pRsp);
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
} }
......
...@@ -300,7 +300,7 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) { ...@@ -300,7 +300,7 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return 0; return 0;
} }
ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK); ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
qInfo("stream continue dispatching: task %d", pTask->taskId); qInfo("stream continue dispatching: task %d", pTask->taskId);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) { static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
void* exec = pTask->exec.executor; void* exec = pTask->exec.executor;
bool hasData = false;
// set input // set input
SStreamQueueItem* pItem = (SStreamQueueItem*)data; SStreamQueueItem* pItem = (SStreamQueueItem*)data;
...@@ -27,7 +28,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -27,7 +28,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
ASSERT(pTask->isDataScan); ASSERT(pTask->isDataScan);
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)data; SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
SArray* blocks = pBlock->blocks; SArray* blocks = pBlock->blocks;
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false);
...@@ -43,7 +44,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -43,7 +44,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
if (qExecTask(exec, &output, &ts) < 0) { if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false); ASSERT(false);
} }
if (output == NULL) break; if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE && !hasData) {
SSDataBlock block = {0};
block.info.type = STREAM_PUSH_DATA;
block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block);
}
break;
}
hasData = true;
if (output->info.type == STREAM_RETRIEVE) { if (output->info.type == STREAM_RETRIEVE) {
if (streamBroadcastToChildren(pTask, output) < 0) { if (streamBroadcastToChildren(pTask, output) < 0) {
......
...@@ -42,6 +42,9 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { ...@@ -42,6 +42,9 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr); walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr); taosRemoveFile(fnameStr);
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
} }
} }
walRemoveMeta(pWal); walRemoveMeta(pWal);
...@@ -105,7 +108,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -105,7 +108,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
} }
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr); walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ); TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pIdxTFile == NULL) { if (pIdxTFile == NULL) {
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
...@@ -126,7 +129,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -126,7 +129,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
ASSERT(entry.ver == ver); ASSERT(entry.ver == ver);
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr); walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ); TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pLogTFile == NULL) { if (pLogTFile == NULL) {
// TODO // TODO
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
...@@ -307,8 +310,8 @@ int walRoll(SWal *pWal) { ...@@ -307,8 +310,8 @@ int walRoll(SWal *pWal) {
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset}; SWalIdxEntry entry = {.ver = ver, .offset = offset};
/*int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_CUR);*/ int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END);
/*wDebug("write index: ver: %ld, offset: %ld, at %ld", ver, offset, idxOffset);*/ wDebug("write index: ver: %ld, offset: %ld, at %ld", ver, offset, idxOffset);
int64_t size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry)); int64_t size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry));
if (size != sizeof(SWalIdxEntry)) { if (size != sizeof(SWalIdxEntry)) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册