提交 32fddeff 编写于 作者: H Haojun Liao

fix(stream): fix error retrieve data from source task.And optimize the test cases.

上级 3289ad62
......@@ -160,7 +160,7 @@ static const SSysDbTableSchema streamSchema[] = {
static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "task_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "task_id", .bytes = 32, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
......
......@@ -1221,12 +1221,16 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
if (pShow->pIter == NULL) break;
if (pShow->pIter == NULL) {
break;
}
// lock
taosRLockLatch(&pStream->lock);
// count task num
int32_t sz = taosArrayGetSize(pStream->tasks);
int32_t count = 0;
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
......@@ -1236,10 +1240,12 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
if (numOfRows + count > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + count);
}
// add row for each task
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t levelCnt = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < levelCnt; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
......@@ -1249,12 +1255,19 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// stream name
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
// task id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pTask->id.taskId, false);
char idstr[128] = {0};
int32_t len = tintToHex(pTask->id.taskId, &idstr[4]);
idstr[2] = '0';
idstr[3] = 'x';
varDataSetLen(idstr, len + 2);
colDataSetVal(pColInfo, numOfRows, idstr, false);
// node type
char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
......@@ -1283,8 +1296,8 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
memcpy(varDataVal(level), "sink", 4);
varDataSetLen(level, 4);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&level, false);
......
......@@ -1203,8 +1203,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (pTask->historyTaskId.taskId == 0) {
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
tqDebug("s-task:%s without associated stream task, reset the time window:%" PRId64 " - %" PRId64, pId,
pWindow->skey, pWindow->ekey);
tqDebug("s-task:%s no associated task, reset the time window:%" PRId64 " - %" PRId64, pId, pWindow->skey,
pWindow->ekey);
} else {
tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64
", window:%" PRId64 " - %" PRId64, pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
......
......@@ -207,7 +207,7 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = true;
bool noNewDataInWal = true;
bool noDataInWal = true;
int32_t vgId = pStreamMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
......@@ -235,7 +235,6 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int32_t status = pTask->status.taskStatus;
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
// tqTrace("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->info.taskLevel);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
......@@ -261,36 +260,44 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
int32_t numOfItemsInQ = taosQueueItemSize(pTask->inputQueue->queue);
// append the data for the stream
SStreamQueueItem* pItem = NULL;
code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) { // failed, continue
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
// delete ignore
if (pItem == NULL) {
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
noNewDataInWal = false;
noDataInWal = false;
if (pItem != NULL) {
code = tAppendDataToInputQueue(pTask, pItem);
if (code == TSDB_CODE_SUCCESS) {
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
pTask->chkInfo.currentVer);
} else {
tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr,
pTask->chkInfo.currentVer);
}
}
code = tqAddInputBlockNLaunchTask(pTask, pItem);
if (code == TSDB_CODE_SUCCESS) {
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
pTask->chkInfo.currentVer);
} else {
tqError("s-task:%s append input queue failed, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer);
if ((code == TSDB_CODE_SUCCESS) || (numOfItemsInQ > 0)) {
code = streamSchedExec(pTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask);
return -1;
}
}
streamMetaReleaseTask(pStreamMeta, pTask);
}
// all wal are checked, and no new data available in wal.
if (noNewDataInWal) {
if (noDataInWal) {
*pScanIdle = true;
}
......
......@@ -131,10 +131,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
} else {
pOperator->status = OP_NOT_OPENED;
SStreamScanInfo* pInfo = pOperator->info;
qDebug("s-task:%s in this batch, all %d blocks need to be processed and dump results", id, (int32_t)numOfBlocks);
qDebug("s-task:%s in this batch, %d blocks need to be processed", id, (int32_t)numOfBlocks);
ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0);
if (type == STREAM_INPUT__MERGED_SUBMIT) {
......
......@@ -113,6 +113,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qError("failed to create msg to aunch s-task:%s, reason out of memory", pTask->id.idStr);
return -1;
}
......@@ -170,8 +171,8 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
// enqueue
if (pData != NULL) {
qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId,
pReq->srcTaskId, pReq->reqId);
qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId,
pReq->srcTaskId, pReq->srcNodeId, pReq->reqId);
pData->type = STREAM_INPUT__DATA_RETRIEVE;
pData->srcVgId = 0;
......@@ -308,9 +309,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
}
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
qDebug("s-task:%s receive retrieve req from taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->srcTaskId, pReq->srcNodeId);
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->info.taskLevel != TASK_LEVEL__SINK);
streamSchedExec(pTask);
return 0;
......
......@@ -111,7 +111,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);
numOfBlocks += 1;
qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId,
qDebug("s-task:%s(child %d) retrieve process completed, reqId:0x%" PRIx64" dump results", pTask->id.idStr, pTask->info.selfChildId,
pRetrieveBlock->reqId);
}
......
......@@ -575,7 +575,6 @@ endi
$loop_count = 0
print step 7
sql create database test3 vgroups 6;
sql use test3;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
......
......@@ -44,7 +44,7 @@ class TDTestCase:
new_dbname = list(dbname)
new_dbname.insert(i,j)
dbname_1 = ''.join(new_dbname)
tdSql.execute(f'create database if not exists `{dbname_1}`')
tdSql.execute(f'create database if not exists `{dbname_1}` vgroups 1 replica 1')
tdSql.query('select * from information_schema.ins_databases')
tdSql.checkEqual(tdSql.queryResult[2][0],str(dbname_1))
tdSql.execute(f'drop database `{dbname_1}`')
......@@ -56,7 +56,7 @@ class TDTestCase:
def tb_name_check(self):
dbname = tdCom.getLongName(10)
tdSql.execute(f'create database if not exists `{dbname}`')
tdSql.execute(f'create database if not exists `{dbname}` vgroups 1 replica 1')
tdSql.execute(f'use `{dbname}`')
tbname = tdCom.getLongName(5)
for i in self.special_name:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册