提交 eacde27f 编写于 作者: L Liu Jicong

refactor: remove unused code

上级 25f24af0
......@@ -203,10 +203,6 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema,
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
SBatchDeleteReq* pDeleteReq, void** ppData, int32_t* pLen);
// sma
int32_t smaInit();
void smaCleanUp();
......
......@@ -155,6 +155,200 @@ _exit:
return code;
}
int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema,
SSchemaWrapper *pTagSchemaWrapper, bool createTb, int64_t suid, const char *stbFullName,
SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) {
void *pBuf = NULL;
int32_t len = 0;
SSubmitReq2 *pReq = NULL;
SArray *tagArray = NULL;
SArray *createTbArray = NULL;
SArray *pVals = NULL;
int32_t sz = taosArrayGetSize(pBlocks);
if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) {
goto _end;
}
if (!(createTbArray = taosArrayInit(sz, POINTER_BYTES))) {
goto _end;
}
if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
goto _end;
}
// create table req
if (createTb) {
for (int32_t i = 0; i < sz; ++i) {
SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i);
SVCreateTbReq *pCreateTbReq = NULL;
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
taosArrayPush(createTbArray, &pCreateTbReq);
continue;
}
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
goto _end;
};
// don't move to the end of loop as to destroy in the end of func when error occur
taosArrayPush(createTbArray, &pCreateTbReq);
// set const
pCreateTbReq->flags = 0;
pCreateTbReq->type = TSDB_CHILD_TABLE;
pCreateTbReq->ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pCreateTbReq->ctb.stbName = strdup((char *)tNameGetTableName(&name)); // strdup(stbFullName);
// set tag content
taosArrayClear(tagArray);
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.id.groupId,
};
taosArrayPush(tagArray, &tagVal);
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
STag *pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
pCreateTbReq->ctb.pTag = (uint8_t *)pTag;
// set tag name
SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
strcpy(tagNameStr, "group_id");
taosArrayPush(tagName, tagNameStr);
pCreateTbReq->ctb.tagName = tagName;
// set table name
if (pDataBlock->info.parTbName[0]) {
pCreateTbReq->name = strdup(pDataBlock->info.parTbName);
} else {
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
}
}
}
// SSubmitTbData req
for (int32_t i = 0; i < sz; ++i) {
SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid;
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
continue;
}
int32_t rows = pDataBlock->info.rows;
SSubmitTbData *pTbData = (SSubmitTbData *)taosMemoryCalloc(1, sizeof(SSubmitTbData));
if (!pTbData) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow *)))) {
taosMemoryFree(pTbData);
goto _end;
}
pTbData->suid = suid;
pTbData->uid = 0; // uid is assigned by vnode
pTbData->sver = pTSchema->version;
if (createTb) {
pTbData->pCreateTbReq = taosArrayGetP(createTbArray, i);
if (pTbData->pCreateTbReq) pTbData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
}
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
taosArrayDestroy(pTbData->aRowP);
taosMemoryFree(pTbData);
goto _end;
}
for (int32_t j = 0; j < rows; j++) {
taosArrayClear(pVals);
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn *pCol = &pTSchema->columns[k];
SColumnInfoData *pColData = taosArrayGet(pDataBlock->pDataBlock, k);
if (colDataIsNull_s(pColData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
} else {
void *data = colDataGetData(pColData, j);
if (IS_STR_DATA_TYPE(pCol->type)) {
SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
} else {
SValue sv;
memcpy(&sv.val, data, tDataTypes[pCol->type].bytes);
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
}
}
}
SRow *pRow = NULL;
if ((terrno = tRowBuild(pVals, (STSchema *)pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(pTbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
ASSERT(pRow);
taosArrayPush(pTbData->aRowP, &pRow);
}
taosArrayPush(pReq->aSubmitTbData, pTbData);
}
// encode
tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno);
if (TSDB_CODE_SUCCESS == terrno) {
SEncoder encoder;
len += sizeof(SMsgHead);
pBuf = rpcMallocCont(len);
if (NULL == pBuf) {
goto _end;
}
((SMsgHead *)pBuf)->vgId = htonl(TD_VID(pVnode));
((SMsgHead *)pBuf)->contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req since %s", terrstr());
}
tEncoderClear(&encoder);
}
_end:
taosArrayDestroy(tagArray);
taosArrayDestroy(pVals);
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
if (terrno != 0) {
rpcFreeCont(pBuf);
taosArrayDestroy(pDeleteReq->deleteReqs);
return TSDB_CODE_FAILED;
}
if (ppData) *ppData = pBuf;
if (pLen) *pLen = len;
return TSDB_CODE_SUCCESS;
}
/**
* @brief Insert/Update Time-range-wise SMA data.
*
......@@ -220,8 +414,9 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
void *pSubmitReq = NULL;
int32_t contLen = 0;
if (tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true,
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen) < 0) {
if (smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true,
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq,
&contLen) < 0) {
smaError("vgId:%d, failed to gen submit msg while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
indexUid, tstrerror(terrno));
goto _err;
......
......@@ -71,428 +71,6 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
return 0;
}
#if 0
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema,
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
SBatchDeleteReq* pDeleteReq) {
SSubmitReq* ret = NULL;
SArray* schemaReqs = NULL;
SArray* schemaReqSz = NULL;
SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
int32_t sz = taosArrayGetSize(pBlocks);
if (createTb) {
schemaReqs = taosArrayInit(sz, sizeof(void*));
schemaReqSz = taosArrayInit(sz, sizeof(int32_t));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
int32_t padding1 = 0;
void* padding2 = NULL;
taosArrayPush(schemaReqSz, &padding1);
taosArrayPush(schemaReqs, &padding2);
continue;
}
// STag* pTag = NULL;
// taosArrayClear(tagArray);
// SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
// for(int j = 0; j < pTagSchemaWrapper->nCols; j++){
// STagVal tagVal = {
// .cid = pTagSchemaWrapper->pSchema[j].colId,
// .type = pTagSchemaWrapper->pSchema[j].type,
// .i64 = (int64_t)pDataBlock->info.id.groupId,
// };
// taosArrayPush(tagArray, &tagVal);
// taosArrayPush(tagName, pTagSchemaWrapper->pSchema[j].name);
// }
//
// tTagNew(tagArray, 1, false, &pTag);
// if (pTag == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// taosArrayDestroy(tagArray);
// taosArrayDestroy(tagName);
// return NULL;
// }
SVCreateTbReq createTbReq = {0};
// set const
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
// set tag content
taosArrayClear(tagArray);
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.id.groupId,
};
taosArrayPush(tagArray, &tagVal);
createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(tagArray);
taosArrayDestroyP(schemaReqs, taosMemoryFree);
taosArrayDestroy(schemaReqSz);
return NULL;
}
createTbReq.ctb.pTag = (uint8_t*)pTag;
// set tag name
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
strcpy(tagNameStr, "group_id");
taosArrayPush(tagName, tagNameStr);
createTbReq.ctb.tagName = tagName;
// set table name
if (pDataBlock->info.parTbName[0]) {
createTbReq.name = strdup(pDataBlock->info.parTbName);
} else {
createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
}
// save schema len
int32_t code;
int32_t schemaLen;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosArrayDestroyP(schemaReqs, taosMemoryFree);
taosArrayDestroy(schemaReqSz);
return NULL;
}
taosArrayPush(schemaReqSz, &schemaLen);
// save schema str
void* schemaStr = taosMemoryMalloc(schemaLen);
if (schemaStr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosArrayDestroyP(schemaReqs, taosMemoryFree);
taosArrayDestroy(schemaReqSz);
return NULL;
}
taosArrayPush(schemaReqs, &schemaStr);
SEncoder encoder = {0};
tEncoderInit(&encoder, schemaStr, schemaLen);
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosArrayDestroyP(schemaReqs, taosMemoryFree);
taosArrayDestroy(schemaReqSz);
tEncoderClear(&encoder);
return NULL;
}
tEncoderClear(&encoder);
tdDestroySVCreateTbReq(&createTbReq);
}
}
taosArrayDestroy(tagArray);
// cal size
int32_t cap = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
continue;
}
int32_t rows = pDataBlock->info.rows;
/*int32_t rowSize = pDataBlock->info.rowSize;*/
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
int32_t schemaLen = 0;
if (createTb) {
schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
}
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
}
// assign data
ret = rpcMallocCont(cap);
ret->header.vgId = pVnode->config.vgId;
ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz);
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid;
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
continue;
}
blkHead->numOfRows = htonl(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version);
blkHead->suid = htobe64(suid);
// uid is assigned by vnode
blkHead->uid = 0;
int32_t rows = pDataBlock->info.rows;
int32_t dataLen = 0;
int32_t schemaLen = 0;
void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
if (createTb) {
schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
void* schemaStr = taosArrayGetP(schemaReqs, i);
memcpy(blkSchema, schemaStr, schemaLen);
}
blkHead->schemaLen = htonl(schemaLen);
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
for (int32_t j = 0; j < rows; j++) {
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version);
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
tdSRowResetBuf(&rb, rowData);
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pColumn = &pTSchema->columns[k];
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
if (colDataIsNull_s(pColData, j)) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
} else {
void* data = colDataGetData(pColData, j);
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
}
}
tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen);
dataLen += rowLen;
}
blkHead->dataLen = htonl(dataLen);
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + schemaLen + dataLen);
}
ret->length = htonl(ret->length);
taosArrayDestroyP(schemaReqs, taosMemoryFree);
taosArrayDestroy(schemaReqSz);
return ret;
}
#endif
int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema,
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
SBatchDeleteReq* pDeleteReq, void** ppData, int32_t* pLen) {
void* pBuf = NULL;
int32_t len = 0;
SSubmitReq2* pReq = NULL;
SArray* tagArray = NULL;
SArray* createTbArray = NULL;
SArray* pVals = NULL;
int32_t sz = taosArrayGetSize(pBlocks);
if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) {
goto _end;
}
if (!(createTbArray = taosArrayInit(sz, POINTER_BYTES))) {
goto _end;
}
if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
goto _end;
}
// create table req
if (createTb) {
for (int32_t i = 0; i < sz; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
SVCreateTbReq* pCreateTbReq = NULL;
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
taosArrayPush(createTbArray, &pCreateTbReq);
continue;
}
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
goto _end;
};
// don't move to the end of loop as to destroy in the end of func when error occur
taosArrayPush(createTbArray, &pCreateTbReq);
// set const
pCreateTbReq->flags = 0;
pCreateTbReq->type = TSDB_CHILD_TABLE;
pCreateTbReq->ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
// set tag content
taosArrayClear(tagArray);
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.id.groupId,
};
taosArrayPush(tagArray, &tagVal);
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set tag name
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
strcpy(tagNameStr, "group_id");
taosArrayPush(tagName, tagNameStr);
pCreateTbReq->ctb.tagName = tagName;
// set table name
if (pDataBlock->info.parTbName[0]) {
pCreateTbReq->name = strdup(pDataBlock->info.parTbName);
} else {
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
}
}
}
// SSubmitTbData req
for (int32_t i = 0; i < sz; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid;
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
continue;
}
int32_t rows = pDataBlock->info.rows;
SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData));
if (!pTbData) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
taosMemoryFree(pTbData);
goto _end;
}
pTbData->suid = suid;
pTbData->uid = 0; // uid is assigned by vnode
pTbData->sver = pTSchema->version;
if (createTb) {
pTbData->pCreateTbReq = taosArrayGetP(createTbArray, i);
if (pTbData->pCreateTbReq) pTbData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
}
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
taosArrayDestroy(pTbData->aRowP);
taosMemoryFree(pTbData);
goto _end;
}
for (int32_t j = 0; j < rows; j++) {
taosArrayClear(pVals);
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pCol = &pTSchema->columns[k];
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
if (colDataIsNull_s(pColData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
} else {
void* data = colDataGetData(pColData, j);
if (IS_STR_DATA_TYPE(pCol->type)) {
SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
} else {
SValue sv;
memcpy(&sv.val, data, tDataTypes[pCol->type].bytes);
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
}
}
}
SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, (STSchema*)pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(pTbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
ASSERT(pRow);
taosArrayPush(pTbData->aRowP, &pRow);
}
taosArrayPush(pReq->aSubmitTbData, pTbData);
}
// encode
tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno);
if (TSDB_CODE_SUCCESS == terrno) {
SEncoder encoder;
len += sizeof(SMsgHead);
pBuf = rpcMallocCont(len);
if (NULL == pBuf) {
goto _end;
}
((SMsgHead*)pBuf)->vgId = htonl(TD_VID(pVnode));
((SMsgHead*)pBuf)->contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req since %s", terrstr());
}
tEncoderClear(&encoder);
}
_end:
taosArrayDestroy(tagArray);
taosArrayDestroy(pVals);
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
if (terrno != 0) {
rpcFreeCont(pBuf);
taosArrayDestroy(pDeleteReq->deleteReqs);
return TSDB_CODE_FAILED;
}
if (ppData) *ppData = pBuf;
if (pLen) *pLen = len;
return TSDB_CODE_SUCCESS;
}
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
......@@ -984,56 +562,3 @@ _end:
taosArrayDestroy(pVals);
// TODO: change
}
#if 0
void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pRes = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
SBatchDeleteReq deleteReq = {0};
tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size);
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true,
pTask->tbSink.stbUid, pTask->tbSink.stbFullName, &deleteReq);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
int32_t code;
int32_t len;
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
SEncoder encoder;
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, len);
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
tEncoderClear(&encoder);
((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;
SRpcMsg msg = {
.msgType = TDMT_VND_BATCH_DEL,
.pCont = serializedDeleteReq,
.contLen = len + sizeof(SMsgHead),
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(serializedDeleteReq);
tqDebug("failed to put into write-queue since %s", terrstr());
}
}
taosArrayDestroy(deleteReq.deleteReqs);
/*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
// build write msg
SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT,
.pCont = submitReq,
.contLen = ntohl(submitReq->length),
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqDebug("failed to put into write-queue since %s", terrstr());
}
}
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册