提交 79d60495 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-14481-3.0

......@@ -101,8 +101,8 @@ int32_t create_topic() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
......
......@@ -56,11 +56,11 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
#define colDataSetNotNull_f(bm_, r_) \
do { \
BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)
......@@ -187,8 +187,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
}
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource,
uint32_t numOfRow2);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
......@@ -230,9 +230,9 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
void blockDebugShowData(const SArray* dataBlocks);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t uid, tb_uid_t suid);
tb_uid_t uid, tb_uid_t suid);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, int32_t vgId);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
......
......@@ -25,6 +25,8 @@ extern "C" {
#ifndef _TSTREAM_H_
#define _TSTREAM_H_
typedef struct SStreamTask SStreamTask;
enum {
STREAM_TASK_STATUS__RUNNING = 1,
STREAM_TASK_STATUS__STOP,
......@@ -69,20 +71,24 @@ typedef struct {
SUseDbRsp dbInfo;
} STaskDispatcherShuffle;
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
typedef struct {
int8_t reserved;
int64_t stbUid;
SSchemaWrapper* pSchemaWrapper;
// not applicable to encoder and decoder
void* vnode;
FTbSink* tbSinkFunc;
STSchema* pTSchema;
SHashObj* pHash; // groupId to tbuid
} STaskSinkTb;
typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data);
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
typedef struct {
int64_t smaId;
// following are not applicable to encoder and decoder
FSmaHandle* smaHandle;
FSmaSink* smaSink;
} STaskSinkSma;
typedef struct {
......@@ -115,7 +121,7 @@ enum {
TASK_SINK__FETCH,
};
typedef struct {
struct SStreamTask {
int64_t streamId;
int32_t taskId;
int8_t status;
......@@ -150,8 +156,7 @@ typedef struct {
// application storage
void* ahandle;
} SStreamTask;
};
SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
......
......@@ -363,9 +363,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd
for (int32_t i = 0; i < pDest->info.numOfCols; ++i) {
int32_t mapIndex = i;
// if (pIndexMap) {
// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
// }
// if (pIndexMap) {
// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
// }
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex);
......@@ -1596,7 +1596,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
return TSDB_CODE_SUCCESS;
}
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
int32_t vgId) {
SSubmitReq* ret = NULL;
// cal size
......@@ -1608,13 +1609,37 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
// TODO min
int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
cap += sizeof(SSubmitBlk) + rows * maxLen;
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
createTbReq.name = "a";
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = htobe64(suid);
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0);
}
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
}
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
}
// assign data
ret = taosMemoryCalloc(1, cap);
ret = taosMemoryCalloc(1, cap + 46);
ret = POINTER_SHIFT(ret, 46);
ret->header.vgId = vgId;
ret->version = htonl(1);
ret->length = htonl(cap - sizeof(SSubmitReq));
ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz);
void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq));
......@@ -1623,19 +1648,47 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
SSubmitBlk* blkHead = submitBlk;
blkHead->numOfRows = htons(pDataBlock->info.rows);
blkHead->schemaLen = 0;
blkHead->sversion = htonl(pTSchema->version);
// TODO
blkHead->suid = 0;
blkHead->uid = htobe64(pDataBlock->info.uid);
blkHead->suid = htobe64(suid);
// uid is assigned by vnode
blkHead->uid = 0;
int32_t rows = pDataBlock->info.rows;
/*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/
/*blkHead->dataLen = htonl(rows * maxLen);*/
blkHead->dataLen = 0;
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk));
STSRow* rowData = blockData;
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk));
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
createTbReq.name = "a";
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0);
}
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
SEncoder encoder = {0};
tEncoderInit(&encoder, blockData, schemaLen);
if (tEncodeSVCreateTbReq(&encoder, &createTbReq) < 0) return NULL;
tEncoderClear(&encoder);
}
blkHead->schemaLen = htonl(schemaLen);
STSRow* rowData = POINTER_SHIFT(blockData, schemaLen);
for (int32_t j = 0; j < rows; j++) {
SRowBuilder rb = {0};
......@@ -1653,10 +1706,14 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
rowData = POINTER_SHIFT(rowData, rowLen);
blkHead->dataLen += rowLen;
}
int32_t len = blkHead->dataLen;
blkHead->dataLen = htonl(len);
blkHead = POINTER_SHIFT(blkHead, len);
int32_t dataLen = blkHead->dataLen;
blkHead->dataLen = htonl(dataLen);
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
blkHead = POINTER_SHIFT(blkHead, schemaLen + dataLen);
/*submitBlk = blkHead;*/
}
ret->length = htonl(ret->length);
return ret;
}
......@@ -56,7 +56,6 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
ASSERT(0);
}
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pIter->dataLen + pIter->schemaLen);
ASSERT(pIter->len > 0);
}
......@@ -4013,4 +4012,4 @@ int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) {
tEndDecode(pCoder);
return 0;
}
\ No newline at end of file
}
......@@ -106,6 +106,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); // sync integration
int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc);
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
......
......@@ -51,6 +51,7 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) {
pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs - pMgmt->state.numOfBatchInsertSuccessReqs;
pMgmt->state = pInfo->vstat;
tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs);
taosArrayDestroy(vloads.pVloads);
}
......@@ -177,6 +178,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
......
......@@ -135,6 +135,7 @@ static void *vmOpenVnodeFunc(void *param) {
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
......@@ -147,7 +148,7 @@ static void *vmOpenVnodeFunc(void *param) {
pThread->failed++;
} else {
vmOpenVnode(pMgmt, pCfg, pImpl);
//vnodeStart(pImpl);
// vnodeStart(pImpl);
dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->opened++;
}
......
......@@ -357,6 +357,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
pMsg->rpcMsg = *pRpc;
// if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0);
switch (qtype) {
case WRITE_QUEUE:
dTrace("msg:%p, will be put into vnode-write queue", pMsg);
taosWriteQitem(pVnode->pWriteQ, pMsg);
break;
case QUERY_QUEUE:
dTrace("msg:%p, will be put into vnode-query queue", pMsg);
taosWriteQitem(pVnode->pQueryQ, pMsg);
......@@ -387,6 +391,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
return code;
}
int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, WRITE_QUEUE);
}
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE);
}
......
......@@ -574,6 +574,7 @@ typedef struct {
char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t targetStbUid;
int64_t createTime;
int64_t updateTime;
int64_t uid;
......
......@@ -416,6 +416,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
/*int32_t outputNameSz = 0;*/
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
......@@ -465,6 +468,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
......@@ -529,4 +535,4 @@ void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
buf = taosDecodeStringTo(buf, pOffset->key);
buf = taosDecodeFixedI64(buf, &pOffset->offset);
return buf;
}
\ No newline at end of file
}
......@@ -204,6 +204,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
ASSERT(pTask->tbSink.pSchemaWrapper);
}
......@@ -244,9 +245,10 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
//
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
......@@ -319,6 +321,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
#endif
......
......@@ -360,6 +360,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
goto _OVER;
}
stbObj.uid = pStream->targetStbUid;
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
return 0;
......@@ -379,6 +381,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
streamObj.dbUid = pDb->uid;
streamObj.version = 1;
streamObj.sql = pCreate->sql;
......
......@@ -967,7 +967,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
pAction->msgSent = 0;
pAction->msgReceived = 0;
pAction->errCode = 0;
mDebug("trans:%d, action:%d is reset and will be re-executed", pTrans->id, action);
mDebug("trans:%d, action:%d execute status is reset", pTrans->id, action);
}
}
......@@ -1043,7 +1043,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
return errCode;
}
} else {
mDebug("trans:%d, %d of %d actions executing", pTrans->id, numOfReceived, numOfActions);
mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfReceived, numOfActions);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
}
......
......@@ -141,16 +141,19 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
goto _err;
}
// preprocess req
pReq->uid = tGenIdPI64();
pReq->ctime = taosGetTimestampMs();
// validate req
metaReaderInit(&mr, pMeta, 0);
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
pReq->uid = mr.me.uid;
if (pReq->type == TSDB_CHILD_TABLE) {
pReq->ctb.suid = mr.me.ctbEntry.suid;
}
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
metaReaderClear(&mr);
return -1;
} else {
pReq->uid = tGenIdPI64();
pReq->ctime = taosGetTimestampMs();
}
metaReaderClear(&mr);
......
......@@ -884,24 +884,48 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
}
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
if (pTask->execType == TASK_EXEC__NONE) return 0;
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pRes = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
ASSERT(pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, pVnode->config.vgId);
/*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
// build write msg
SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT,
.pCont = pReq,
.contLen = ntohl(pReq->length),
};
pTask->exec.numOfRunners = parallel;
pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner));
if (pTask->exec.runners == NULL) {
return -1;
ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0);
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
if (pTask->execType != TASK_EXEC__NONE) {
// expand runners
pTask->exec.numOfRunners = parallel;
pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner));
if (pTask->exec.runners == NULL) {
return -1;
}
for (int32_t i = 0; i < parallel; i++) {
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = {
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta,
};
pTask->exec.runners[i].inputHandle = pStreamReader;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.runners[i].executor);
}
}
for (int32_t i = 0; i < parallel; i++) {
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = {
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta,
};
pTask->exec.runners[i].inputHandle = pStreamReader;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.runners[i].executor);
if (pTask->sinkType == TASK_SINK__TABLE) {
pTask->tbSink.vnode = pTq->pVnode;
pTask->tbSink.tbSinkFunc = tqTableSink;
}
return 0;
}
......@@ -925,7 +949,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
// sink
pTask->ahandle = pTq->pVnode;
if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaHandle = smaHandleRes;
pTask->smaSink.smaSink = smaHandleRes;
} else if (pTask->sinkType == TASK_SINK__TABLE) {
ASSERT(pTask->tbSink.pSchemaWrapper);
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
......
......@@ -76,6 +76,8 @@ struct SMemSkipListCurosr {
#define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l)
#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l)
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
// SMemTable
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
SMemTable *pMemTb = NULL;
......@@ -176,20 +178,19 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
// do insert data to SMemData
SMemSkipListCurosr slc = {0};
const uint8_t *p = pSubmitBlk->pData;
const uint8_t *pt;
const STSRow *pRow;
uint64_t szRow;
uint32_t szRow;
SDecoder decoder = {0};
// tCoderInit(&coder, TD_LITTLE_ENDIAN, pSubmitBlk->pData, pSubmitBlk->nData, TD_DECODER);
tDecoderInit(&decoder, pSubmitBlk->pData, pSubmitBlk->nData);
for (;;) {
// if (tDecodeIsEnd(&coder)) break;
if (tDecodeIsEnd(&decoder)) break;
if (tDecodeBinary(&decoder, (const uint8_t **)&pRow, &szRow) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
// if (tDecodeBinary(&coder, (const uint8_t **)&pRow, &szRow) < 0) {
// terrno = TSDB_CODE_INVALID_MSG;
// return -1;
// }
// check the row (todo)
// // move the cursor to position to write (todo)
......@@ -197,11 +198,16 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
// tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c);
// ASSERT(c);
// // encode row
// int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
// int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt);
// pSlNode = vnodeBufPoolMalloc(pPool, tsize);
// pSlNode->level = level;
// encode row
int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (0 /*todo*/);
SMemSkipListNode *pNode = vnodeBufPoolMalloc(pPool, tsize);
if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pNode->level = level;
// uint8_t *pData = SL_NODE_DATA(pSlNode);
// *(int64_t *)pData = version;
......@@ -215,7 +221,7 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
if (pRow->ts < pMemData->minKey) pMemData->minKey = pRow->ts;
if (pRow->ts > pMemData->maxKey) pMemData->maxKey = pRow->ts;
}
// tCoderClear(&coder);
tDecoderClear(&decoder);
// tsdbMemSkipListCursorClose(&slc);
// update status
......@@ -228,4 +234,19 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version;
return 0;
}
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
int8_t level = 1;
int8_t tlevel;
const uint32_t factor = 4;
if (pSl->size) {
tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) {
level++;
}
}
return level;
}
\ No newline at end of file
......@@ -38,7 +38,8 @@
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName);
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName);
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) {
......@@ -159,7 +160,8 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
return false;
}
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableScanInfo* pInfo = pOperator->info;
......@@ -189,7 +191,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
pCost->loadBlockStatis += 1;
bool allColumnsHaveAgg = true;
bool allColumnsHaveAgg = true;
SColumnDataAgg** pColAgg = NULL;
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
......@@ -261,7 +263,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
if (isTaskKilled(pOperator->pTaskInfo)) {
......@@ -344,7 +346,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
pTableScanInfo->scanFlag = REPEAT_SCAN;
qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64
"-%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
"-%" PRId64,
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
// do prepare for the next round table scan operation
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
......@@ -373,22 +376,22 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon
pInfo->cond = *pCond;
pInfo->scanInfo = (SScanInfo){.numOfAsc = scanInfo[0], .numOfDesc = scanInfo[1]};
pInfo->interval = *pInterval;
pInfo->sampleRatio = sampleRatio;
pInfo->interval = *pInterval;
pInfo->sampleRatio = sampleRatio;
pInfo->dataBlockLoadFlag = dataLoadFlag;
pInfo->pResBlock = pResBlock;
pInfo->pFilterNode = pCondition;
pInfo->dataReader = pDataReader;
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColMatchInfo;
pOperator->name = "TableScanOperator"; // for dubug purpose
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = numOfOutput;
pOperator->pTaskInfo = pTaskInfo;
pInfo->pResBlock = pResBlock;
pInfo->pFilterNode = pCondition;
pInfo->dataReader = pDataReader;
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColMatchInfo;
pOperator->name = "TableScanOperator"; // for dubug purpose
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = numOfOutput;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL);
......@@ -404,17 +407,17 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
pInfo->dataReader = pReadHandle;
// pInfo->prevGroupId = -1;
pInfo->dataReader = pReadHandle;
// pInfo->prevGroupId = -1;
pOperator->name = "TableSeqScanOperator";
pOperator->name = "TableSeqScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL);
return pOperator;
......@@ -514,18 +517,18 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) {
SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex);
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) {
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) {
taosArrayPush(pInfo->tsArray, ts+i);
taosArrayPush(pInfo->tsArray, ts + i);
}
}
if (taosArrayGetSize(pInfo->tsArray) > 0) {
//TODO(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(pInfo->pRes, true);
// p->info.type = STREAM_INVERT;
// taosArrayClear(pInfo->tsArray);
// return p;
// TODO(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(pInfo->pRes, true);
// p->info.type = STREAM_INVERT;
// taosArrayClear(pInfo->tsArray);
// return p;
return NULL;
}
return NULL;
......@@ -535,7 +538,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info;
int32_t rows = 0;
int32_t rows = 0;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
......@@ -571,7 +574,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
int32_t numOfRows = 0;
int16_t outputCol = 0;
int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &uid, &numOfRows, &outputCol);
int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &uid, &numOfRows, &outputCol);
if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
pTaskInfo->code = code;
......@@ -652,8 +655,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
SArray* pColIds = taosArrayInit(4, sizeof(int16_t));
for (int32_t i = 0; i < numOfOutput; ++i) {
int16_t* id = taosArrayGet(pColList, i);
taosArrayPush(pColIds, id);
SColMatchInfo* id = taosArrayGet(pColList, i);
int16_t colId = id->colId;
taosArrayPush(pColIds, &colId);
}
pInfo->pColMatchInfo = pColList;
......@@ -678,8 +682,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
return NULL;
}
pInfo->primaryTsIndex = 0; //TODO(liuyao) get it from physical plan
pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); //TODO(liuyao) get it from physical plan
pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan
pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); // TODO(liuyao) get it from physical plan
if (pInfo->pUpdateInfo == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
......@@ -687,25 +691,26 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
}
pInfo->readerHandle = streamReadHandle;
pInfo->pRes = pResBlock;
pInfo->pCondition = pCondition;
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->fpSet._openFn = operatorDummyOpenFn;
pInfo->pRes = pResBlock;
pInfo->pCondition = pCondition;
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->fpSet._openFn = operatorDummyOpenFn;
pOperator->fpSet.getNextFn = doStreamBlockScan;
pOperator->fpSet.closeFn = operatorDummyCloseFn;
pOperator->fpSet.closeFn = operatorDummyCloseFn;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
return pOperator;
_error:
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
......@@ -774,7 +779,7 @@ static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
if (NULL == pCondition) {
return;
}
nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*) dbName);
nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
}
static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
......@@ -809,7 +814,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
code = filterSetDataFromSlotId(filter, &param1);
int8_t* rowRes = NULL;
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
filterFreeInfo(filter);
SSDataBlock* px = createOneDataBlock(pInfo->pRes, false);
......@@ -853,13 +858,13 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
static SSDataBlock* buildSysTableMetaBlock() {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
size_t size = 0;
const SSysTableMeta *pMeta = NULL;
size_t size = 0;
const SSysTableMeta* pMeta = NULL;
getInfosDbMeta(&pMeta, &size);
int32_t index = 0;
for(int32_t i = 0; i < size; ++i) {
if(strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
for (int32_t i = 0; i < size; ++i) {
if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
index = i;
break;
}
......@@ -867,7 +872,7 @@ static SSDataBlock* buildSysTableMetaBlock() {
pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData));
for(int32_t i = 0; i < pMeta[index].colNum; ++i) {
for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
SColumnInfoData colInfoData = {0};
colInfoData.info.colId = i + 1;
colInfoData.info.type = pMeta[index].schema[i].type;
......@@ -1091,7 +1096,7 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
SSDataBlock* p = buildSysTableMetaBlock();
blockDataEnsureCapacity(p, capacity);
size_t size = 0;
size_t size = 0;
const SSysTableMeta* pSysDbTableMeta = NULL;
getInfosDbMeta(&pSysDbTableMeta, &size);
......@@ -1100,18 +1105,19 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
getPerfDbMeta(&pSysDbTableMeta, &size);
p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
// blockDataDestroy(p); todo handle memory leak
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
// blockDataDestroy(p); todo handle memory leak
pInfo->pRes->info.rows = p->info.rows;
return p->info.rows;
}
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName) {
char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName) {
char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t numOfRows = p->info.rows;
for(int32_t i = 0; i < size; ++i) {
for (int32_t i = 0; i < size; ++i) {
const SSysTableMeta* pm = &pSysDbTableMeta[i];
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
......@@ -1132,7 +1138,7 @@ int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbT
pColInfoData = taosArrayGet(p->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false);
for(int32_t j = 4; j <= 8; ++j) {
for (int32_t j = 4; j <= 8; ++j) {
pColInfoData = taosArrayGet(p->pDataBlock, j);
colDataAppendNULL(pColInfoData, numOfRows);
}
......@@ -1160,18 +1166,18 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
return NULL;
}
pInfo->accountId = accountId;
pInfo->accountId = accountId;
pInfo->showRewrite = showRewrite;
pInfo->pRes = pResBlock;
pInfo->pCondition = pCondition;
pInfo->scanCols = colList;
pInfo->pRes = pResBlock;
pInfo->pCondition = pCondition;
pInfo->scanCols = colList;
initResultSizeInfo(pOperator, 4096);
tNameAssign(&pInfo->name, pName);
const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
pInfo->readHandle = *(SReadHandle*) readHandle;
pInfo->readHandle = *(SReadHandle*)readHandle;
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
} else {
tsem_init(&pInfo->ready, 0, 0);
......@@ -1201,14 +1207,14 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
#endif
}
pOperator->name = "SysTableScanOperator";
pOperator->name = "SysTableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator,
NULL, NULL, NULL);
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
......@@ -1355,26 +1361,27 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
}
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
SSDataBlock* pResBlock, SArray* pColMatchInfo,
STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->pTableGroups = pTableGroupInfo;
pInfo->pColMatchInfo = pColMatchInfo;
pInfo->pRes = pResBlock;
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
pOperator->name = "TagScanOperator";
pInfo->pTableGroups = pTableGroupInfo;
pInfo->pColMatchInfo = pColMatchInfo;
pInfo->pRes = pResBlock;
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
pOperator->name = "TagScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExpr;
pOperator->numOfExprs = numOfOutput;
pOperator->pTaskInfo = pTaskInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExpr;
pOperator->numOfExprs = numOfOutput;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
......
#include "ttime.h"
#include "tdatablock.h"
#include "executorimpl.h"
#include "functionMgt.h"
#include "tdatablock.h"
#include "ttime.h"
typedef enum SResultTsInterpType {
RESULT_ROW_START_INTERP = 1,
......@@ -545,7 +545,6 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
}
}
static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlock, SqlFunctionCtx* pCtx,
SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep,
int32_t order, bool timeWindowInterpo) {
......@@ -759,10 +758,10 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SIntervalAggOperatorInfo* pInfo = pOperator->info;
int32_t order = TSDB_ORDER_ASC;
int32_t order = TSDB_ORDER_ASC;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
......@@ -808,7 +807,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
}
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex);
int64_t gid = pBlock->info.groupId;
......@@ -932,7 +931,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
......@@ -981,10 +980,10 @@ static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArr
}
}
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
for ( int i = 0; i < num; i++) {
for (int i = 0; i < num; i++) {
if (type == STREAM_INVERT) {
fmSetInvertFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
} else if (type == STREAM_NORMAL){
} else if (type == STREAM_NORMAL) {
fmSetNormalFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
}
}
......@@ -992,7 +991,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info;
int32_t order = TSDB_ORDER_ASC;
int32_t order = TSDB_ORDER_ASC;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
......@@ -1038,7 +1037,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
ASSERT(pInfo->binfo.pRes->info.rows > 0);
// TODO: remove for stream
/*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/
pOperator->status = OP_RES_TO_RETURN;
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
......@@ -1070,17 +1070,17 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
SExecTaskInfo* pTaskInfo) {
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->order = TSDB_ORDER_ASC;
pInfo->interval = *pInterval;
pInfo->order = TSDB_ORDER_ASC;
pInfo->interval = *pInterval;
// pInfo->execModel = OPTR_EXEC_MODEL_STREAM;
pInfo->execModel = pTaskInfo->execModel;
pInfo->win = pTaskInfo->window;
pInfo->twAggSup = *pTwAggSupp;
pInfo->execModel = pTaskInfo->execModel;
pInfo->win = pTaskInfo->window;
pInfo->twAggSup = *pTwAggSupp;
pInfo->primaryTsIndex = primaryTsSlotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
......@@ -1099,14 +1099,14 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
pOperator->name = "TimeIntervalAggOperator";
pOperator->name = "TimeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL,
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
......@@ -1118,7 +1118,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
return pOperator;
_error:
_error:
destroyIntervalOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
......@@ -1131,7 +1131,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
SExecTaskInfo* pTaskInfo) {
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
......@@ -1177,7 +1177,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
return pOperator;
_error:
_error:
destroyIntervalOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
......@@ -1321,7 +1321,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
return pSliceInfo->binfo.pRes;
}
int32_t order = TSDB_ORDER_ASC;
int32_t order = TSDB_ORDER_ASC;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
......@@ -1379,7 +1379,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
_error:
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -1402,18 +1402,18 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
pInfo->twAggSup = *pTwAggSup;
pInfo->twAggSup = *pTwAggSup;
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->tsSlotId = tsSlotId;
pOperator->name = "StateWindowOperator";
pInfo->tsSlotId = tsSlotId;
pOperator->name = "StateWindowOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pInfo;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL,
destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
......@@ -1421,7 +1421,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
_error:
pTaskInfo->code = TSDB_CODE_SUCCESS;
return NULL;
}
......@@ -1432,8 +1432,8 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
}
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SExecTaskInfo* pTaskInfo) {
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -1453,18 +1453,18 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->tsSlotId = tsSlotId;
pInfo->gap = gap;
pInfo->binfo.pRes = pResBlock;
pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false;
pOperator->name = "SessionWindowAggOperator";
pInfo->tsSlotId = tsSlotId;
pInfo->gap = gap;
pInfo->binfo.pRes = pResBlock;
pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false;
pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
......@@ -1473,7 +1473,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
_error:
if (pInfo != NULL) {
destroySWindowOperatorInfo(pInfo, numOfCols);
}
......@@ -1482,4 +1482,4 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
\ No newline at end of file
}
......@@ -432,7 +432,7 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
if (pInput[i].numOfRows == 1) {
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * factor * (numOfRows - numOfNulls);
} else {
inputLen += pInputData[i]->varmeta.length - (numOfRows - numOfNulls) * VARSTR_HEADER_SIZE;
inputLen += (pInputData[i]->varmeta.length - (numOfRows - numOfNulls) * VARSTR_HEADER_SIZE) * factor;
}
}
......@@ -510,7 +510,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
} else if (pInput[i].numOfRows == 1) {
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * (numOfRows - numOfNulls) * factor;
} else {
inputLen += pInputData[i]->varmeta.length - (numOfRows - numOfNulls) * VARSTR_HEADER_SIZE;
inputLen += (pInputData[i]->varmeta.length - (numOfRows - numOfNulls) * VARSTR_HEADER_SIZE) * factor;
}
}
......
......@@ -150,14 +150,14 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
pRes = (SArray*)input;
}
if (pRes == NULL || taosArrayGetSize(pRes) == 0) return 0;
// sink
if (pTask->sinkType == TASK_SINK__TABLE) {
/*blockDebugShowData(pRes);*/
ASSERT(pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema);
tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
} else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
//
} else if (pTask->sinkType == TASK_SINK__FETCH) {
//
......@@ -276,7 +276,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
}
if (pTask->sinkType == TASK_SINK__TABLE) {
/*if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
......@@ -321,7 +321,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
if (pTask->sinkType == TASK_SINK__TABLE) {
/*if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
......
......@@ -442,4 +442,4 @@ TEST(td_encode_test, compound_struct_encode_test) {
#endif
#pragma GCC diagnostic pop
#endif
\ No newline at end of file
#endif
......@@ -59,6 +59,10 @@
# ---- table
./test.sh -f tsim/table/basic1.sim
# ---- tstream
./test.sh -f tsim/tstream/basic0.sim
# ---- tmq
./test.sh -f tsim/tmq/basic1.sim
./test.sh -f tsim/tmq/basic2.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database d0 vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use d0
print =============== create super table, include column type for count/sum/min/max/first
sql create table if not exists stb (ts timestamp, k int) tags (a int)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags(1000)
sql create table ct2 using stb tags(2000)
sql create table ct3 using stb tags(3000)
sql show tables
if $rows != 3 then
return -1
endi
sql create stream s1 into outstb as select _wstartts, min(k), max(k), sum(k) as sum_alias from ct1 interval(10m)
sql show stables
if $rows != 2 then
return -1
endi
print =============== insert data
sql insert into ct1 values('2022-05-08 03:42:00.000', 234)
sleep 100
#===================================================================
print =============== query data from child table
sql select `_wstartts`,`min(k)`,`max(k)`,sum_alias from outstb
print rows: $rows
print $data00 $data01 $data02 $data03
if $rows != 1 then
return -1
endi
if $data01 != 234 then
return -1
endi
if $data02 != 234 then
return -1
endi
if $data03 != 234 then
return -1
endi
#===================================================================
print =============== insert data
sql insert into ct1 values('2022-05-08 03:43:00.000', -111)
sleep 100
#===================================================================
print =============== query data from child table
sql select `_wstartts`,`min(k)`,`max(k)`,sum_alias from outstb
print rows: $rows
print $data00 $data01 $data02 $data03
if $rows != 1 then
return -1
endi
if $data01 != -111 then
return -1
endi
if $data02 != 234 then
return -1
endi
if $data03 != 123 then
return -1
endi
#===================================================================
print =============== insert data
sql insert into ct1 values('2022-05-08 03:53:00.000', 789)
sleep 100
#===================================================================
print =============== query data from child table
sql select `_wstartts`,`min(k)`,`max(k)`,sum_alias from outstb
print rows: $rows
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
if $rows != 2 then
return -1
endi
if $data01 != -111 then
return -1
endi
if $data02 != 234 then
return -1
endi
if $data03 != 123 then
return -1
endi
if $data11 != 789 then
return -1
endi
if $data12 != 789 then
return -1
endi
if $data13 != 789 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册