提交 58122d07 编写于 作者: L Liu Jicong

add stream trigger

上级 d1363d9c
......@@ -77,7 +77,8 @@ int32_t create_stream() {
}
taos_free_result(pRes);
const char* sql = "select sum(k) from tu1";
/*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
const char* sql = "select min(k), max(k), sum(k) from st1";
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
pRes = tmq_create_stream(pConn, "stream1", "out1", sql);
if (taos_errno(pRes) != 0) {
......
......@@ -191,6 +191,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
......
......@@ -2872,7 +2872,7 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
}
int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
if (tStartEncode(pEncoder) < 0) return -1;
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->level) < 0) return -1;
......@@ -2884,12 +2884,12 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
// if (tEncodeI8(pEncoder, pTask->numOfRunners) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1;
tEndEncode(pEncoder);
/*tEndEncode(pEncoder);*/
return pEncoder->pos;
}
int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
if (tStartDecode(pDecoder) < 0) return -1;
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1;
......@@ -2901,7 +2901,7 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
// if (tDecodeI8(pDecoder, &pTask->numOfRunners) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1;
tEndDecode(pDecoder);
/*tEndDecode(pDecoder);*/
return 0;
}
......
......@@ -277,6 +277,7 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
......
......@@ -117,7 +117,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
int32_t opNum = LIST_LENGTH(inner->pNodeList);
ASSERT(opNum == 1);
SSubplan* plan = nodesListGetNode(inner->pNodeList, level);
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
if (level == 0) {
ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
void* pIter = NULL;
......
......@@ -176,7 +176,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg*
void tqClose(STQ*);
// required by vnode
int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
......@@ -184,6 +184,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen);
#ifdef __cplusplus
}
......
......@@ -17,6 +17,8 @@
#include "tqInt.h"
#include "tqMetaStore.h"
void tqDebugShowSSData(SArray* dataBlocks);
int32_t tqInit() { return tqPushMgrInit(); }
void tqCleanUp() { tqPushMgrCleanUp(); }
......@@ -70,59 +72,19 @@ void tqClose(STQ* pTq) {
// TODO
}
int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) {
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version) {
if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter;
if (!pTask->pipeSource) continue;
int32_t workerId = 0;
void* exec = pTask->runner[workerId].executor;
qSetStreamInput(exec, msg, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* output;
uint64_t ts;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false);
}
if (output == NULL) {
break;
}
taosArrayPush(pRes, output);
}
if (pTask->pipeSink) {
// write back
} else {
int32_t tlen = sizeof(SStreamExecMsgHead) + tEncodeDataBlocks(NULL, pRes);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
}
void* abuf = POINTER_SHIFT(buf, sizeof(SStreamExecMsgHead));
tEncodeDataBlocks(abuf, pRes);
tmsg_t type;
if (pTask->nextOpDst == STREAM_NEXT_OP_DST__VND) {
type = TDMT_VND_TASK_EXEC;
} else {
type = TDMT_SND_TASK_EXEC;
}
SRpcMsg msg = {
.pCont = buf,
.contLen = tlen,
.code = 0,
.msgType = type,
};
tmsgSendReq(&pTq->pVnode->msgCb, &pTask->NextOpEp, &msg);
}
void* data = malloc(msgLen);
if (data == NULL) {
return -1;
}
memcpy(data, msg, msgLen);
SRpcMsg req = {
.msgType = TDMT_VND_STREAM_TRIGGER,
.pCont = data,
.contLen = msgLen,
};
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req);
#if 0
void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL);
......@@ -577,7 +539,11 @@ void tqDebugShowSSData(SArray* dataBlocks) {
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
printf(" %15u |", *(uint32_t*)var);
printf(" %15d |", *(int32_t*)var);
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
printf(" %15ld |", *(int64_t*)var);
break;
}
}
......@@ -586,6 +552,62 @@ void tqDebugShowSSData(SArray* dataBlocks) {
}
}
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter;
if (!pTask->pipeSource) continue;
int32_t workerId = 0;
void* exec = pTask->runner[workerId].executor;
qSetStreamInput(exec, data, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* output;
uint64_t ts;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false);
}
if (output == NULL) {
break;
}
taosArrayPush(pRes, output);
}
if (pTask->pipeSink) {
// write back
/*printf("reach end\n");*/
tqDebugShowSSData(pRes);
} else {
int32_t tlen = sizeof(SStreamExecMsgHead) + tEncodeDataBlocks(NULL, pRes);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
}
void* abuf = POINTER_SHIFT(buf, sizeof(SStreamExecMsgHead));
tEncodeDataBlocks(abuf, pRes);
tmsg_t type;
if (pTask->nextOpDst == STREAM_NEXT_OP_DST__VND) {
type = TDMT_VND_TASK_EXEC;
} else {
type = TDMT_SND_TASK_EXEC;
}
SRpcMsg reqMsg = {
.pCont = buf,
.contLen = tlen,
.code = 0,
.msgType = type,
};
tmsgSendReq(&pTq->pVnode->msgCb, &pTask->NextOpEp, &reqMsg);
}
}
return 0;
}
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) {
SStreamTaskExecReq* pReq = msg->pCont;
......
......@@ -126,6 +126,36 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
if (pArray == NULL) {
return NULL;
}
int32_t colMeta = 0;
int32_t colNeed = 0;
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
int16_t colIdSchema = pColSchema->colId;
int16_t colIdNeed = *(int16_t*)taosArrayGet(pHandle->pColIdList, colNeed);
if (colIdSchema < colIdNeed) {
colMeta++;
} else if (colIdSchema > colIdNeed) {
colNeed++;
} else {
SColumnInfoData colInfo = {0};
int sz = numOfRows * pColSchema->bytes;
colInfo.info.bytes = pColSchema->bytes;
colInfo.info.colId = pColSchema->colId;
colInfo.info.type = pColSchema->type;
colInfo.pData = calloc(1, sz);
if (colInfo.pData == NULL) {
// TODO free
taosArrayDestroy(pArray);
return NULL;
}
blockDataEnsureColumnCapacity(&colInfo, numOfRows);
taosArrayPush(pArray, &colInfo);
colMeta++;
colNeed++;
}
}
int j = 0;
for (int32_t i = 0; i < colNumNeed; i++) {
......@@ -163,11 +193,23 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row);
// get all wanted col of that block
int32_t colTot = taosArrayGetSize(pArray);
for (int32_t i = 0; i < colTot; i++) {
SColumnInfoData* pColData = taosArrayGet(pArray, i);
SCellVal sVal = {0};
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
break;
}
memcpy(POINTER_SHIFT(pColData->pData, curRow * pColData->info.bytes), sVal.val, pColData->info.bytes);
}
#if 0
for (int32_t i = 0; i < colNumNeed; i++) {
SColumnInfoData* pColData = taosArrayGet(pArray, i);
STColumn* pCol = schemaColAt(pTschema, i);
// TODO
ASSERT(pCol->colId == pColData->info.colId);
if(pCol->colId != pColData->info.colId) {
continue;
}
// void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
SCellVal sVal = {0};
if (!tdSTSRowIterNext(&iter, pCol->colId, pCol->type, &sVal)) {
......@@ -176,6 +218,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
}
memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), sVal.val, pCol->bytes);
}
#endif
curRow++;
}
return pArray;
......
......@@ -68,6 +68,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_EXEC:
return tqProcessTaskExec(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TRIGGER:
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
default:
......@@ -163,7 +165,6 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
memcpy(POINTER_SHIFT(metaRsp.pSchemas, sizeof(SSchema) * pSW->nCols), pTagSchema, sizeof(SSchema) * nTagCols);
}
_exit:
rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
......@@ -179,7 +180,6 @@ _exit:
}
tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
tFreeSTableMetaRsp(&metaRsp);
if (pSW != NULL) {
tfree(pSW->pSchema);
......
......@@ -59,7 +59,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// todo: change the interface here
int64_t ver;
taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver);
if (tqPushMsg(pVnode->pTq, ptr, pMsg->msgType, ver) < 0) {
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) {
// TODO: handle error
}
......@@ -85,10 +85,10 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
for (int i = 0; i < reqNum; i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
char tableFName[TSDB_TABLE_FNAME_LEN];
char tableFName[TSDB_TABLE_FNAME_LEN];
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
sprintf(tableFName, "%s.%s", pCreateTbReq->dbFName, pCreateTbReq->name);
int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName);
if (code) {
SVCreateTbRsp rsp;
......@@ -96,7 +96,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
taosArrayPush(vCreateTbBatchRsp.rspList, &rsp);
}
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
// TODO: handle error
vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name);
......@@ -116,10 +116,10 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
taosArrayDestroy(vCreateTbBatchReq.pArray);
if (vCreateTbBatchRsp.rspList) {
int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp);
void *msg = rpcMallocCont(contLen);
void *msg = rpcMallocCont(contLen);
tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp);
taosArrayDestroy(vCreateTbBatchRsp.rspList);
*pRsp = calloc(1, sizeof(SRpcMsg));
(*pRsp)->msgType = TDMT_VND_CREATE_TABLE_RSP;
(*pRsp)->pCont = msg;
......
......@@ -30,7 +30,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
qError("join not supported for stream block scan, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
}
pOperator->status = OP_NOT_OPENED;
return doSetStreamBlock(pOperator->pDownstream[0], input, type, id);
} else {
SStreamBlockScanInfo* pInfo = pOperator->info;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册