提交 8c5dbde9 编写于 作者: L Liu Jicong

refactor(stream): stream reader created in scanner

上级 9355392c
......@@ -137,8 +137,8 @@ int32_t create_topic() {
}
taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -199,7 +199,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
......
......@@ -2826,8 +2826,8 @@ typedef struct {
static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
// tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
// tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI16(buf, pRsp->resMsgType);
tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen);
tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen);
......@@ -2835,8 +2835,8 @@ static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp
}
static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
// buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
// buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI16(buf, &pRsp->resMsgType);
buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen);
buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen);
......
......@@ -30,13 +30,15 @@ struct SRpcMsg;
struct SSubplan;
typedef struct SReadHandle {
void* reader;
void* streamReader;
void* meta;
void* config;
void* vnode;
void* mnd;
SMsgCb* pMsgCb;
bool tqReader;
bool initMetaReader;
bool initTableReader;
bool initStreamReader;
} SReadHandle;
typedef enum {
......
......@@ -223,7 +223,7 @@ typedef struct {
SEpSet epSet;
} SStreamChildEpInfo;
struct SStreamTask {
typedef struct SStreamTask {
int64_t streamId;
int32_t taskId;
int8_t isDataScan;
......@@ -235,6 +235,11 @@ struct SStreamTask {
int8_t taskStatus;
int8_t execStatus;
// exec info
int64_t enqueueVer;
int64_t processedVer;
int64_t checkpointVer;
// node info
int32_t selfChildId;
int32_t nodeId;
......@@ -277,7 +282,7 @@ struct SStreamTask {
// msg handle
SMsgCb* pMsgCb;
};
} SStreamTask;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
......@@ -288,6 +293,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
#if 0
while (1) {
int8_t inputStatus =
atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
......@@ -296,6 +302,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
}
ASSERT(0);
}
#endif
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
......@@ -316,8 +323,10 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
}
#if 0
// TODO: back pressure
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif
return 0;
}
......
......@@ -178,7 +178,6 @@ void walFsync(SWal *, bool force);
// apis for lifecycle management
int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t walRollback(SWal *, int64_t ver);
// notify that previous logs can be pruned safely
int32_t walBeginSnapshot(SWal *, int64_t ver);
......@@ -207,10 +206,11 @@ void walCloseRef(SWalRef *);
int32_t walRefVer(SWalRef *, int64_t ver);
int32_t walUnrefVer(SWal *);
// help function for raft
bool walLogExist(SWal *, int64_t ver);
bool walIsEmpty(SWal *);
// lifecycle check
bool walIsEmpty(SWal *);
int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *);
......
......@@ -45,6 +45,7 @@ void taosIp2String(uint32_t ip, char *str);
void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
char *strDupUnquo(const char *src);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
T_MD5_CTX context;
......
......@@ -49,19 +49,18 @@ struct tmq_list_t {
};
struct tmq_conf_t {
char clientId[256];
char groupId[TSDB_CGROUP_LEN];
int8_t autoCommit;
int8_t resetOffset;
int8_t withTbName;
int8_t spEnable;
int32_t spBatchSize;
uint16_t port;
int32_t autoCommitInterval;
char* ip;
char* user;
char* pass;
/*char* db;*/
char clientId[256];
char groupId[TSDB_CGROUP_LEN];
int8_t autoCommit;
int8_t resetOffset;
int8_t withTbName;
int8_t spEnable;
int32_t spBatchSize;
uint16_t port;
int32_t autoCommitInterval;
char* ip;
char* user;
char* pass;
tmq_commit_cb* commitCb;
void* commitCbUserParam;
};
......@@ -337,7 +336,7 @@ tmq_list_t* tmq_list_new() {
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
SArray* container = &list->container;
char* topic = strdup(src);
char* topic = strDupUnquo(src);
if (taosArrayPush(container, &topic) == NULL) return -1;
return 0;
}
......
......@@ -332,7 +332,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
}
SReadHandle handle = {
.reader = pReadHandle,
.streamReader = pReadHandle,
.meta = pMeta,
.pMsgCb = pMsgCb,
.vnode = pVnode,
......
......@@ -28,8 +28,12 @@ int32_t tqInit() {
atomic_store_8(&tqMgmt.inited, 0);
return -1;
}
if (streamInit() < 0) {
return -1;
}
atomic_store_8(&tqMgmt.inited, 1);
}
return 0;
}
......@@ -361,8 +365,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT(IS_META_MSG(pHead->msgType));
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0};
metaRsp.reqOffset = pReq->reqOffset.version;
metaRsp.rspOffset = fetchVer;
/*metaRsp.reqOffset = pReq->reqOffset.version;*/
/*metaRsp.rspOffset = fetchVer;*/
/*metaRsp.rspOffsetNew.version = fetchVer;*/
tqOffsetResetToLog(&metaRsp.reqOffsetNew, pReq->reqOffset.version);
tqOffsetResetToLog(&metaRsp.rspOffsetNew, fetchVer);
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
......@@ -448,10 +455,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
req.qmsg = NULL;
for (int32_t i = 0; i < 5; i++) {
SReadHandle handle = {
.reader = pHandle->execHandle.pExecReader[i],
.streamReader = pHandle->execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.tqReader = true,
.initTableReader = true,
};
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
ASSERT(pHandle->execHandle.execCol.task[i]);
......@@ -522,11 +529,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
if (pTask->execType != TASK_EXEC__NONE) {
// expand runners
if (pTask->isDataScan) {
SStreamReader* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = {
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
/*SStreamReader* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);*/
SReadHandle handle = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initStreamReader = 1,
};
/*pTask->exec.inputHandle = pStreamReader;*/
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
......
......@@ -84,7 +84,7 @@ int32_t tqMetaOpen(STQ* pTq) {
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) {
SReadHandle reader = {
.reader = handle.execHandle.pExecReader[i],
.streamReader = handle.execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
};
......
......@@ -338,7 +338,7 @@ typedef struct SessionWindowSupporter {
uint8_t parentType;
} SessionWindowSupporter;
typedef struct SStreamBlockScanInfo {
typedef struct SStreamScanInfo {
uint64_t tableUid; // queried super table uid
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
......@@ -355,7 +355,7 @@ typedef struct SStreamBlockScanInfo {
int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned?
uint64_t numOfExec; // execution times
void* streamBlockReader;// stream block reader handle
void* streamReader;// stream block reader handle
int32_t tsArrayIndex;
SArray* tsArray;
......@@ -364,7 +364,7 @@ typedef struct SStreamBlockScanInfo {
EStreamScanMode scanMode;
SOperatorInfo* pStreamScanOp;
SOperatorInfo* pSnapshotReadOp;
SOperatorInfo* pTableScanOp;
SArray* childIds;
SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
......@@ -373,7 +373,7 @@ typedef struct SStreamBlockScanInfo {
SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex;
} SStreamBlockScanInfo;
} SStreamScanInfo;
typedef struct SSysTableScanInfo {
SRetrieveMetaTableRsp* pRsp;
......
......@@ -37,7 +37,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} else {
pOperator->status = OP_NOT_OPENED;
SStreamBlockScanInfo* pInfo = pOperator->info;
SStreamScanInfo* pInfo = pOperator->info;
pInfo->assignBlockUid = assignUid;
// TODO: if a block was set but not consumed,
......@@ -45,7 +45,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pInfo->blockType = type;
if (type == STREAM_INPUT__DATA_SUBMIT) {
if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) {
if (tqReadHandleSetMsg(pInfo->streamReader, input, 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
}
......@@ -130,7 +130,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return pTaskInfo;
}
static SArray* filterQualifiedChildTables(const SStreamBlockScanInfo* pScanInfo, const SArray* tableIdList) {
static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList) {
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
// let's discard the tables those are not created according to the queried super table.
......@@ -168,17 +168,17 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
pInfo = pInfo->pDownstream[0];
}
int32_t code = 0;
SStreamBlockScanInfo* pScanInfo = pInfo->info;
int32_t code = 0;
SStreamScanInfo* pScanInfo = pInfo->info;
if (isAdd) { // add new table id
SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList);
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa);
code = tqReadHandleAddTbUidList(pScanInfo->streamReader, qa);
taosArrayDestroy(qa);
} else { // remove the table id in current list
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
code = tqReadHandleRemoveTbUidList(pScanInfo->streamBlockReader, tableIdList);
code = tqReadHandleRemoveTbUidList(pScanInfo->streamReader, tableIdList);
}
return code;
......
......@@ -2847,12 +2847,12 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamBlockScanInfo* pScanInfo = pOperator->info;
SStreamScanInfo* pScanInfo = pOperator->info;
pScanInfo->blockType = STREAM_INPUT__DATA_SCAN;
pScanInfo->pSnapshotReadOp->status = OP_OPENED;
pScanInfo->pTableScanOp->status = OP_OPENED;
STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info;
STableScanInfo* pInfo = pScanInfo->pTableScanOp->info;
ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);
if (uid == 0) {
......@@ -2912,8 +2912,8 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamBlockScanInfo* pScanInfo = pOperator->info;
STableScanInfo* pSnapShotScanInfo = pScanInfo->pSnapshotReadOp->info;
SStreamScanInfo* pScanInfo = pOperator->info;
STableScanInfo* pSnapShotScanInfo = pScanInfo->pTableScanOp->info;
*uid = pSnapShotScanInfo->lastStatus.uid;
*ts = pSnapShotScanInfo->lastStatus.ts;
} else {
......@@ -4537,9 +4537,9 @@ static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanI
}
return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
} else {
SStreamBlockScanInfo* pInfo = pOperator->info;
ASSERT(pInfo->pSnapshotReadOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
*ppInfo = pInfo->pSnapshotReadOp->info;
SStreamScanInfo* pInfo = pOperator->info;
ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
*ppInfo = pInfo->pTableScanOp->info;
return 0;
}
}
......
......@@ -788,7 +788,7 @@ _error:
return NULL;
}
static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
static void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
size_t total = taosArrayGetSize(pInfo->pBlockLists);
pInfo->validBlockIndex = 0;
......@@ -799,11 +799,11 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear(pInfo->pBlockLists);
}
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
static bool isSessionWindow(SStreamScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
}
static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
static bool isStateWindow(SStreamScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
}
......@@ -828,23 +828,21 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
*/
}
static void setGroupId(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
ASSERT(rowIndex < pBlock->info.rows);
switch (pBlock->info.type)
{
case STREAM_DELETE_DATA:
case STREAM_RETRIEVE: {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
uint64_t* groupCol = (uint64_t*)pColInfo->pData;
pInfo->groupId = groupCol[rowIndex];
}
break;
default:
break;
switch (pBlock->info.type) {
case STREAM_DELETE_DATA:
case STREAM_RETRIEVE: {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
uint64_t* groupCol = (uint64_t*)pColInfo->pData;
pInfo->groupId = groupCol[rowIndex];
} break;
default:
break;
}
}
static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
STimeWindow win = {
.skey = INT64_MIN,
.ekey = INT64_MAX,
......@@ -864,11 +862,10 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
win = pCurWin->win;
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
} else {
win =
getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL);
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL);
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
(*pRowIndex) +=
getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
}
needRead = true;
} else if (isStateWindow(pInfo)) {
......@@ -886,7 +883,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
if (!needRead) {
return false;
}
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0;
// tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
......@@ -911,14 +908,14 @@ static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRow
dest->info.rows++;
}
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) {
SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pSnapshotReadOp);
pResult = doTableScan(pInfo->pTableScanOp);
if (pResult == NULL) {
if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) {
// scan next window data
pResult = doTableScan(pInfo->pSnapshotReadOp);
pResult = doTableScan(pInfo->pTableScanOp);
}
}
if (!pResult) {
......@@ -943,7 +940,8 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, i
*/
}
static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) {
static void copyDeleteDataBlock(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator,
SSDataBlock* pUpdateRes) {
if (pDelBlock->info.rows == 0) {
return;
}
......@@ -951,18 +949,20 @@ static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBl
blockDataEnsureCapacity(pUpdateRes, 64);
ASSERT(taosArrayGetSize(pDelBlock->pDataBlock) >= 3);
SColumnInfoData* pStartTsCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pGpCol->pData;
uint64_t* uidCol = (uint64_t*)pGpCol->pData;
SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, DELETE_GROUPID_COLUMN_INDEX);
for (int32_t i = pInfo->deleteDataIndex ; i < pDelBlock->info.rows &&
i < pDelBlock->info.capacity - (endData[i] - startData[i])/pInfo->interval.interval - 1; i++) {
for (int32_t i = pInfo->deleteDataIndex;
i < pDelBlock->info.rows &&
i < pDelBlock->info.capacity - (endData[i] - startData[i]) / pInfo->interval.interval - 1;
i++) {
uint64_t groupId = getGroupId(pOperator, uidCol[i]);
for (TSKEY startTs = startData[i]; startTs <= endData[i]; ) {
for (TSKEY startTs = startData[i]; startTs <= endData[i];) {
colDataAppend(pDestTsCol, pUpdateRes->info.rows, (const char*)&startTs, false);
colDataAppend(pDestGpCol, pUpdateRes->info.rows, (const char*)&groupId, false);
pUpdateRes->info.rows++;
......@@ -977,7 +977,7 @@ static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBl
}
}
static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
static void setUpdateData(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
blockDataCleanup(pUpdateBlock);
int32_t size = taosArrayGetSize(pInfo->tsArray);
if (pInfo->tsArrayIndex < size) {
......@@ -986,11 +986,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
blockDataEnsureCapacity(pUpdateBlock, size);
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid);
pInfo->groupId = getGroupId(pInfo->pTableScanOp, pBlock->info.uid);
int32_t i = 0;
for (; i < size; i++) {
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid);
uint64_t id = getGroupId(pInfo->pTableScanOp, pBlock->info.uid);
if (pInfo->groupId != id) {
break;
}
......@@ -1009,12 +1009,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
}
if (size == 0) {
copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pUpdateBlock);
copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pUpdateBlock);
}
}
static void checkUpdateData(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock,
bool out) {
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
......@@ -1030,15 +1029,15 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, uidColIndex);
uint64_t* uidCol = (uint64_t*)pColDataInfo->pData;
ASSERT(pBlock->info.rows > 0);
for (int32_t i = 0 ; i < pBlock->info.rows; i++) {
for (int32_t i = 0; i < pBlock->info.rows; i++) {
uidCol[i] = getGroupId(pOperator, uidCol[i]);
}
}
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
......@@ -1056,30 +1055,29 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
int32_t current = pInfo->validBlockIndex++;
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
// TODO move into scan
blockDataUpdateTsWindow(pBlock, 0);
switch (pBlock->info.type) {
case STREAM_RETRIEVE:{
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
copyDataBlock(pInfo->pPullDataRes, pBlock);
pInfo->pullDataResIndex = 0;
prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex);
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
}
break;
case STREAM_DELETE_DATA: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
copyDataBlock(pInfo->pDeleteDataRes, pBlock);
copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pInfo->pUpdateRes);
pInfo->updateResIndex = 0;
prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex);
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
return pInfo->pUpdateRes;
}
break;
default:
break;
case STREAM_RETRIEVE: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
copyDataBlock(pInfo->pPullDataRes, pBlock);
pInfo->pullDataResIndex = 0;
prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex);
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
} break;
case STREAM_DELETE_DATA: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
copyDataBlock(pInfo->pDeleteDataRes, pBlock);
copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pInfo->pUpdateRes);
pInfo->updateResIndex = 0;
prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex);
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
return pInfo->pUpdateRes;
} break;
default:
break;
}
return pBlock;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
......@@ -1128,11 +1126,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->streamBlockReader)) {
while (tqNextDataBlock(pInfo->streamReader)) {
SSDataBlock block = {0};
// todo refactor
int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader);
int32_t code = tqRetrieveDataBlock(&block, pInfo->streamReader);
uint64_t groupId = block.info.groupId;
uint64_t uid = block.info.uid;
......@@ -1228,11 +1226,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
}
}
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) {
// check reader last status
// if not match, reset status
SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp);
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
return pResult && pResult->info.rows > 0 ? pResult : NULL;
} else {
......@@ -1256,8 +1256,8 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
uint64_t taskId) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
......@@ -1295,32 +1295,40 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
}
if (pHandle) {
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo, queryId, taskId);
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo, queryId, taskId);
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info;
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
if (pHandle->tqReader) {
if (pHandle->initTableReader) {
pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
pSTInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);
}
if (pHandle->initStreamReader) {
ASSERT(pHandle->streamReader == NULL);
pInfo->streamReader = tqInitSubmitMsgScanner(pHandle->meta);
ASSERT(pInfo->streamReader);
} else {
ASSERT(pHandle->streamReader);
pInfo->streamReader = pHandle->streamReader;
}
if (pSTInfo->interval.interval > 0) {
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
} else {
pInfo->pUpdateInfo = NULL;
}
pInfo->pSnapshotReadOp = pTableScanDummy;
pInfo->pTableScanOp = pTableScanOp;
pInfo->interval = pSTInfo->interval;
pInfo->readHandle = *pHandle;
ASSERT(pHandle->reader);
pInfo->streamBlockReader = pHandle->reader;
pInfo->tableUid = pScanPhyNode->uid;
// set the extract column id to streamHandle
tqReadHandleSetColIdList((SStreamReader*)pHandle->reader, pColIds);
tqReadHandleSetColIdList((SStreamReader*)pHandle->streamReader, pColIds);
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList);
int32_t code = tqReadHandleSetTbUidList(pHandle->streamReader, tableIdList);
if (code != 0) {
taosArrayDestroy(tableIdList);
goto _error;
......@@ -1344,7 +1352,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->deleteDataIndex = 0;
pInfo->pDeleteDataRes = createPullDataBlock();
pOperator->name = "StreamBlockScanOperator";
pOperator->name = "StreamScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
......@@ -1353,7 +1361,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
return pOperator;
......
......@@ -2980,7 +2980,7 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t gap, int64_t waterMark,
uint8_t type) {
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
SStreamBlockScanInfo* pScanInfo = downstream->info;
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = pAggSup, .gap = gap, .parentType = type};
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark);
}
......@@ -3464,8 +3464,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; }
SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup,
SArray* pClosed, __get_win_info_ fn, bool delete) {
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn,
bool delete) {
// Todo(liuyao) save window to tdb
void** pIte = NULL;
size_t keyLen = 0;
......@@ -3604,8 +3604,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getResWinForSession, pInfo->ignoreExpiredData);
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession,
pInfo->ignoreExpiredData);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated);
......@@ -4097,8 +4097,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getResWinForState, pInfo->ignoreExpiredData);
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState,
pInfo->ignoreExpiredData);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
copyUpdateResult(pSeUpdated, pUpdated);
taosHashCleanup(pSeUpdated);
......
......@@ -17,6 +17,7 @@
#define _STREAM_INC_H_
#include "executor.h"
#include "tref.h"
#include "tstream.h"
#ifdef __cplusplus
......@@ -24,8 +25,9 @@ extern "C" {
#endif
typedef struct {
int8_t inited;
void* timer;
int8_t inited;
int32_t refPool;
void* timer;
} SStreamGlobalEnv;
static SStreamGlobalEnv streamEnv;
......
......@@ -76,9 +76,6 @@ void streamTriggerByTimer(void* param, void* tmrId) {
int32_t streamSetupTrigger(SStreamTask* pTask) {
if (pTask->triggerParam != 0) {
if (streamInit() < 0) {
return -1;
}
pTask->timer = taosTmrStart(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
}
......
......@@ -79,13 +79,13 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
}
int32_t walRollback(SWal *pWal, int64_t ver) {
taosThreadMutexLock(&pWal->mutex);
int64_t code;
char fnameStr[WAL_FILE_LEN];
if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1;
}
taosThreadMutexLock(&pWal->mutex);
// find correct file
if (ver < walGetLastFileFirstVer(pWal)) {
......
......@@ -64,6 +64,20 @@ int32_t strdequote(char *z) {
return j + 1; // only one quote, do nothing
}
char *strDupUnquo(const char *src) {
if (src == NULL) return NULL;
if (src[0] != '`') return strdup(src);
int32_t len = (int32_t)strlen(src);
if (src[len - 1] != '`') return NULL;
char *ret = taosMemoryMalloc(len);
if (ret == NULL) return NULL;
for (int32_t i = 0; i < len - 1; i++) {
ret[i] = src[i + 1];
}
ret[len - 1] = 0;
return ret;
}
size_t strtrim(char *z) {
int32_t i = 0;
int32_t j = 0;
......
......@@ -25,7 +25,7 @@ class TDTestCase:
paraDict = {'dbName': 'db2',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
......@@ -44,7 +44,7 @@ class TDTestCase:
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=1,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册