提交 9392c03b 编写于 作者: H Haojun Liao

fix(stream): add some logs.

上级 13f3ca42
......@@ -167,9 +167,9 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey)
int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
// tqSink
int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock,
SBatchDeleteReq* deleteReq);
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr);
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
// tqOffset
char* tqOffsetBuildFName(const char* path, int32_t fVer);
......
......@@ -250,7 +250,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid;
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
tqBuildDeleteReq(stbFullName, pDataBlock, pDeleteReq, "");
continue;
}
......
......@@ -637,7 +637,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->smaSink.smaSink = smaHandleRes;
} else if (pTask->outputType == TASK_OUTPUT__TABLE) {
pTask->tbSink.vnode = pTq->pVnode;
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
int32_t ver1 = 1;
SMetaInfo info = {0};
......
......@@ -17,23 +17,24 @@
#include "tmsg.h"
#include "tq.h"
int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock,
SBatchDeleteReq* deleteReq) {
int32_t totRow = pDataBlock->info.rows;
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) {
int32_t totalRows = pDataBlock->info.rows;
SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
tqDebug("stream delete msg: row %d", totRow);
tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName);
for (int32_t row = 0; row < totRow; row++) {
int64_t startTs = *(int64_t*)colDataGetData(pStartTsCol, row);
int64_t endTs = *(int64_t*)colDataGetData(pEndTsCol, row);
for (int32_t row = 0; row < totalRows; row++) {
int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row);
int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row);
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
char* name;
void* varTbName = NULL;
if (!colDataIsNull(pTbNameCol, totRow, row, NULL)) {
if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) {
varTbName = colDataGetVarData(pTbNameCol, row);
}
......@@ -43,31 +44,17 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
} else {
name = buildCtbNameByGroupId(stbFullName, groupId);
}
tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, start ts:%" PRId64 "end ts:%" PRId64,
pVnode->config.vgId, groupId, name, startTs, endTs);
#if 0
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, name) < 0) {
metaReaderClear(&mr);
tqDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name);
taosMemoryFree(name);
continue;
}
int64_t uid = mr.me.uid;
metaReaderClear(&mr);
taosMemoryFree(name);
#endif
SSingleDeleteReq req = {
.startTs = startTs,
.endTs = endTs,
};
tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64,
pIdStr, groupId, name, skey, ekey);
SSingleDeleteReq req = { .startTs = skey, .endTs = ekey};
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1);
taosMemoryFree(name);
/*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/
taosArrayPush(deleteReq->deleteReqs, &req);
}
return 0;
}
......@@ -108,12 +95,7 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
int32_t tlen = 0;
encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
SRpcMsg msg = {
.msgType = TDMT_VND_CREATE_TABLE,
.pCont = buf,
.contLen = tlen,
};
SRpcMsg msg = { .msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen };
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqError("failed to put into write-queue since %s", terrstr());
}
......@@ -121,13 +103,12 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
return TSDB_CODE_SUCCESS;
}
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
int64_t suid = pTask->tbSink.stbUid;
char* stbFullName = pTask->tbSink.stbFullName;
STSchema* pTSchema = pTask->tbSink.pTSchema;
/*SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper;*/
int32_t blockSz = taosArrayGetSize(pBlocks);
......@@ -141,11 +122,11 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
for (int32_t i = 0; i < blockSz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
int32_t rows = pDataBlock->info.rows;
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
SBatchDeleteReq deleteReq = {0};
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
deleteReq.suid = suid;
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq);
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
tqBuildDeleteReq(stbFullName, pDataBlock, &deleteReq, pTask->id.idStr);
if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
taosArrayDestroy(deleteReq.deleteReqs);
continue;
......@@ -154,10 +135,10 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
int32_t len;
int32_t code;
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
if (code < 0) {
//
ASSERT(0);
if (code != TSDB_CODE_SUCCESS) {
qError("s-task:%s failed to encode delete request", pTask->id.idStr);
}
SEncoder encoder;
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
......@@ -168,11 +149,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;
SRpcMsg msg = {
.msgType = TDMT_VND_BATCH_DEL,
.pCont = serializedDeleteReq,
.contLen = len + sizeof(SMsgHead),
};
SRpcMsg msg = { .msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead) };
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqDebug("failed to put delete req into write-queue since %s", terrstr());
}
......@@ -182,6 +159,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
if (NULL == reqs.pArray) {
goto _end;
}
for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq createTbReq = {0};
SVCreateTbReq* pCreateTbReq = &createTbReq;
......@@ -204,11 +182,13 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end;
}
STagVal tagVal = {
.cid = pTSchema->numOfCols + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.id.groupId,
};
taosArrayPush(tagArray, &tagVal);
// set tag name
......@@ -271,7 +251,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
crTblArray = NULL;
} else {
SSubmitTbData tbData = {0};
tqDebug("tq sink pipe2, convert block1 %d, rows: %d", i, rows);
tqDebug("tq sink pipe, convert block1 %d, rows: %d", i, rows);
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
goto _end;
......@@ -405,8 +385,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
} else {
void* colData = colDataGetData(pColData, j);
if (IS_STR_DATA_TYPE(pCol->type)) {
SValue sv =
(SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; // address copy, no value
// address copy, no value
SValue sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)};
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册