提交 09a04052 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 11dccf12
...@@ -212,14 +212,6 @@ enum { ...@@ -212,14 +212,6 @@ enum {
FETCH_TYPE__NONE, FETCH_TYPE__NONE,
}; };
typedef struct {
int8_t fetchType;
union {
SSDataBlock data;
void* meta;
};
} SFetchRet;
typedef struct SVarColAttr { typedef struct SVarColAttr {
int32_t* offset; // start position for each entry in the list int32_t* offset; // start position for each entry in the list
uint32_t length; // used buffer size that contain the valid data uint32_t length; // used buffer size that contain the valid data
......
...@@ -415,7 +415,7 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p ...@@ -415,7 +415,7 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p
return pSW; return pSW;
} }
static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) { static FORCE_INLINE void tDeleteSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
if (pSchemaWrapper) { if (pSchemaWrapper) {
taosMemoryFree(pSchemaWrapper->pSchema); taosMemoryFree(pSchemaWrapper->pSchema);
taosMemoryFree(pSchemaWrapper); taosMemoryFree(pSchemaWrapper);
...@@ -3421,10 +3421,10 @@ typedef struct { ...@@ -3421,10 +3421,10 @@ typedef struct {
char data[]; // SSubmitReq2 char data[]; // SSubmitReq2
} SSubmitReq2Msg; } SSubmitReq2Msg;
int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq); int32_t tEncodeSubmitReq(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq); int32_t tDecodeSubmitReq(SDecoder* pCoder, SSubmitReq2* pReq);
void tDestroySSubmitTbData(SSubmitTbData* pTbData, int32_t flag); void tDestroySubmitTbData(SSubmitTbData* pTbData, int32_t flag);
void tDestroySSubmitReq(SSubmitReq2* pReq, int32_t flag); void tDestroySubmitReq(SSubmitReq2* pReq, int32_t flag);
typedef struct { typedef struct {
int32_t affectedRows; int32_t affectedRows;
......
...@@ -192,8 +192,6 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo); ...@@ -192,8 +192,6 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit);
void qStreamSetOpen(qTaskInfo_t tinfo); void qStreamSetOpen(qTaskInfo_t tinfo);
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
......
...@@ -147,8 +147,6 @@ typedef struct SWalReader { ...@@ -147,8 +147,6 @@ typedef struct SWalReader {
int64_t curFileFirstVer; int64_t curFileFirstVer;
int64_t curVersion; int64_t curVersion;
int64_t capacity; int64_t capacity;
// int8_t curInvalid;
// int8_t curStopped;
TdThreadMutex mutex; TdThreadMutex mutex;
SWalFilterCond cond; SWalFilterCond cond;
// TODO remove it // TODO remove it
......
...@@ -191,7 +191,7 @@ void taos_free_result(TAOS_RES *res) { ...@@ -191,7 +191,7 @@ void taos_free_result(TAOS_RES *res) {
taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->rsp.blockDataLen); taosArrayDestroy(pRsp->rsp.blockDataLen);
taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
// taosx // taosx
taosArrayDestroy(pRsp->rsp.createTableLen); taosArrayDestroy(pRsp->rsp.createTableLen);
taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree);
...@@ -204,7 +204,7 @@ void taos_free_result(TAOS_RES *res) { ...@@ -204,7 +204,7 @@ void taos_free_result(TAOS_RES *res) {
taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->rsp.blockDataLen); taosArrayDestroy(pRsp->rsp.blockDataLen);
taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
pRsp->resInfo.pRspMsg = NULL; pRsp->resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&pRsp->resInfo); doFreeReqResultInfo(&pRsp->resInfo);
taosMemoryFree(pRsp); taosMemoryFree(pRsp);
......
...@@ -325,7 +325,7 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) { ...@@ -325,7 +325,7 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
taosHashCleanup(pStmt->exec.pBlockHash); taosHashCleanup(pStmt->exec.pBlockHash);
pStmt->exec.pBlockHash = NULL; pStmt->exec.pBlockHash = NULL;
tDestroySSubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
taosMemoryFreeClear(pStmt->exec.pCurrTbData); taosMemoryFreeClear(pStmt->exec.pCurrTbData);
STMT_ERR_RET(stmtCleanBindInfo(pStmt)); STMT_ERR_RET(stmtCleanBindInfo(pStmt));
...@@ -895,7 +895,7 @@ int stmtExec(TAOS_STMT* stmt) { ...@@ -895,7 +895,7 @@ int stmtExec(TAOS_STMT* stmt) {
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
} else { } else {
tDestroySSubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
taosMemoryFreeClear(pStmt->exec.pCurrTbData); taosMemoryFreeClear(pStmt->exec.pCurrTbData);
STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData)); STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
......
...@@ -864,7 +864,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { ...@@ -864,7 +864,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree); taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->dataRsp.blockDataLen); taosArrayDestroy(pRsp->dataRsp.blockDataLen);
taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree); taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset); taosMemoryFreeClear(pRsp->pEpset);
...@@ -877,7 +877,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { ...@@ -877,7 +877,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree); taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->taosxRsp.blockDataLen); taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree); taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
// taosx // taosx
taosArrayDestroy(pRsp->taosxRsp.createTableLen); taosArrayDestroy(pRsp->taosxRsp.createTableLen);
taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree); taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
......
...@@ -1053,9 +1053,9 @@ TEST(clientCase, sub_db_test) { ...@@ -1053,9 +1053,9 @@ TEST(clientCase, sub_db_test) {
} }
TEST(clientCase, sub_tb_test) { TEST(clientCase, sub_tb_test) {
taos_options(TSDB_OPTION_CONFIGDIR, "/home/tests/dir/cfg/"); taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TAOS* pConn = taos_connect("vm116", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
...@@ -1091,7 +1091,7 @@ TEST(clientCase, sub_tb_test) { ...@@ -1091,7 +1091,7 @@ TEST(clientCase, sub_tb_test) {
int32_t precision = 0; int32_t precision = 0;
int32_t totalRows = 0; int32_t totalRows = 0;
int32_t msgCnt = 0; int32_t msgCnt = 0;
int32_t timeout = 25000; int32_t timeout = 2500000;
int32_t count = 0; int32_t count = 0;
...@@ -1117,10 +1117,10 @@ TEST(clientCase, sub_tb_test) { ...@@ -1117,10 +1117,10 @@ TEST(clientCase, sub_tb_test) {
fields = taos_fetch_fields(pRes); fields = taos_fetch_fields(pRes);
numOfFields = taos_field_count(pRes); numOfFields = taos_field_count(pRes);
totalRows += 1; totalRows += 1;
if (totalRows % 100000 == 0) { // if (totalRows % 100000 == 0) {
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
printf("row content: %s\n", buf); printf("row content: %s\n", buf);
} // }
} }
taos_free_result(pRes); taos_free_result(pRes);
......
...@@ -2374,7 +2374,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat ...@@ -2374,7 +2374,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
} }
SRow* pRow = NULL; SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) { if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end; goto _end;
} }
ASSERT(pRow); ASSERT(pRow);
...@@ -2388,7 +2388,7 @@ _end: ...@@ -2388,7 +2388,7 @@ _end:
if (terrno != 0) { if (terrno != 0) {
*ppReq = NULL; *ppReq = NULL;
if (pReq) { if (pReq) {
tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
} }
......
...@@ -1509,7 +1509,9 @@ void tTagSetCid(const STag *pTag, int16_t iTag, int16_t cid) { ...@@ -1509,7 +1509,9 @@ void tTagSetCid(const STag *pTag, int16_t iTag, int16_t cid) {
// STSchema ======================================== // STSchema ========================================
STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version) { STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version) {
STSchema *pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn) * numOfCols); STSchema *pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn) * numOfCols);
if (pTSchema == NULL) return NULL; if (pTSchema == NULL) {
return NULL;
}
pTSchema->numOfCols = numOfCols; pTSchema->numOfCols = numOfCols;
pTSchema->version = version; pTSchema->version = version;
......
...@@ -7058,7 +7058,7 @@ void tDeleteSMqDataRsp(SMqDataRsp *pRsp) { ...@@ -7058,7 +7058,7 @@ void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen); pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen);
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
pRsp->blockData = NULL; pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper);
pRsp->blockSchema = NULL; pRsp->blockSchema = NULL;
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
pRsp->blockTbName = NULL; pRsp->blockTbName = NULL;
...@@ -7159,7 +7159,7 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) { ...@@ -7159,7 +7159,7 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) {
pRsp->blockDataLen = NULL; pRsp->blockDataLen = NULL;
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
pRsp->blockData = NULL; pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper);
pRsp->blockSchema = NULL; pRsp->blockSchema = NULL;
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
pRsp->blockTbName = NULL; pRsp->blockTbName = NULL;
...@@ -7332,7 +7332,7 @@ _exit: ...@@ -7332,7 +7332,7 @@ _exit:
return 0; return 0;
} }
int32_t tEncodeSSubmitReq2(SEncoder *pCoder, const SSubmitReq2 *pReq) { int32_t tEncodeSubmitReq(SEncoder *pCoder, const SSubmitReq2 *pReq) {
if (tStartEncode(pCoder) < 0) return -1; if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeU64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData)) < 0) return -1; if (tEncodeU64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData)) < 0) return -1;
...@@ -7344,7 +7344,7 @@ int32_t tEncodeSSubmitReq2(SEncoder *pCoder, const SSubmitReq2 *pReq) { ...@@ -7344,7 +7344,7 @@ int32_t tEncodeSSubmitReq2(SEncoder *pCoder, const SSubmitReq2 *pReq) {
return 0; return 0;
} }
int32_t tDecodeSSubmitReq2(SDecoder *pCoder, SSubmitReq2 *pReq) { int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq) {
int32_t code = 0; int32_t code = 0;
memset(pReq, 0, sizeof(*pReq)); memset(pReq, 0, sizeof(*pReq));
...@@ -7387,7 +7387,7 @@ _exit: ...@@ -7387,7 +7387,7 @@ _exit:
return code; return code;
} }
void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) { void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
if (NULL == pTbData) { if (NULL == pTbData) {
return; return;
} }
...@@ -7433,14 +7433,14 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) { ...@@ -7433,14 +7433,14 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
} }
} }
void tDestroySSubmitReq(SSubmitReq2 *pReq, int32_t flag) { void tDestroySubmitReq(SSubmitReq2 *pReq, int32_t flag) {
if (pReq->aSubmitTbData == NULL) return; if (pReq->aSubmitTbData == NULL) return;
int32_t nSubmitTbData = TARRAY_SIZE(pReq->aSubmitTbData); int32_t nSubmitTbData = TARRAY_SIZE(pReq->aSubmitTbData);
SSubmitTbData *aSubmitTbData = (SSubmitTbData *)TARRAY_DATA(pReq->aSubmitTbData); SSubmitTbData *aSubmitTbData = (SSubmitTbData *)TARRAY_DATA(pReq->aSubmitTbData);
for (int32_t i = 0; i < nSubmitTbData; i++) { for (int32_t i = 0; i < nSubmitTbData; i++) {
tDestroySSubmitTbData(&aSubmitTbData[i], flag); tDestroySubmitTbData(&aSubmitTbData[i], flag);
} }
taosArrayDestroy(pReq->aSubmitTbData); taosArrayDestroy(pReq->aSubmitTbData);
pReq->aSubmitTbData = NULL; pReq->aSubmitTbData = NULL;
......
...@@ -231,7 +231,7 @@ typedef struct SSnapContext { ...@@ -231,7 +231,7 @@ typedef struct SSnapContext {
} SSnapContext; } SSnapContext;
typedef struct STqReader { typedef struct STqReader {
SPackedData msg2; SPackedData msg;
SSubmitReq2 submit; SSubmitReq2 submit;
int32_t nextBlk; int32_t nextBlk;
int64_t lastBlkUid; int64_t lastBlkUid;
...@@ -242,7 +242,7 @@ typedef struct STqReader { ...@@ -242,7 +242,7 @@ typedef struct STqReader {
int32_t cachedSchemaVer; int32_t cachedSchemaVer;
int64_t cachedSchemaSuid; int64_t cachedSchemaSuid;
SSchemaWrapper *pSchemaWrapper; SSchemaWrapper *pSchemaWrapper;
STSchema *pSchema; SSDataBlock *pResBlock;
} STqReader; } STqReader;
STqReader *tqReaderOpen(SVnode *pVnode); STqReader *tqReaderOpen(SVnode *pVnode);
...@@ -255,6 +255,7 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); ...@@ -255,6 +255,7 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
int32_t tqNextBlock(STqReader *pReader, SSDataBlock* pBlock); int32_t tqNextBlock(STqReader *pReader, SSDataBlock* pBlock);
int32_t tqNextBlockInWal(STqReader* pReader);
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
......
...@@ -193,7 +193,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode); ...@@ -193,7 +193,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode);
void tqNotifyClose(STQ*); void tqNotifyClose(STQ*);
void tqClose(STQ*); void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
......
...@@ -639,7 +639,6 @@ tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) { ...@@ -639,7 +639,6 @@ tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) { STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
STSchema *pTSchema = NULL; STSchema *pTSchema = NULL;
SSchemaWrapper *pSW = NULL; SSchemaWrapper *pSW = NULL;
SSchema *pSchema = NULL;
pSW = metaGetTableSchema(pMeta, uid, sver, lock); pSW = metaGetTableSchema(pMeta, uid, sver, lock);
if (!pSW) return NULL; if (!pSW) return NULL;
......
...@@ -217,8 +217,8 @@ typedef struct STableInfoForChildTable { ...@@ -217,8 +217,8 @@ typedef struct STableInfoForChildTable {
static void destroySTableInfoForChildTable(void* data) { static void destroySTableInfoForChildTable(void* data) {
STableInfoForChildTable* pData = (STableInfoForChildTable*)data; STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
taosMemoryFree(pData->tableName); taosMemoryFree(pData->tableName);
tDeleteSSchemaWrapper(pData->schemaRow); tDeleteSchemaWrapper(pData->schemaRow);
tDeleteSSchemaWrapper(pData->tagRow); tDeleteSchemaWrapper(pData->tagRow);
} }
static void MoveToSnapShotVersion(SSnapContext* ctx) { static void MoveToSnapShotVersion(SSnapContext* ctx) {
......
...@@ -673,8 +673,8 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) ...@@ -673,8 +673,8 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq)
metaUpdateUidIdx(pMeta, &nStbEntry); metaUpdateUidIdx(pMeta, &nStbEntry);
metaULock(pMeta); metaULock(pMeta);
tDeleteSSchemaWrapper(tag); tDeleteSchemaWrapper(tag);
tDeleteSSchemaWrapper(row); tDeleteSchemaWrapper(row);
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf); if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
tDecoderClear(&dc); tDecoderClear(&dc);
......
...@@ -684,7 +684,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma ...@@ -684,7 +684,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
} }
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8 smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8
" failed since %s", " failed since %s",
...@@ -696,7 +696,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma ...@@ -696,7 +696,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version); SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version);
if (pReq) { if (pReq) {
tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }
} }
......
...@@ -299,7 +299,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * ...@@ -299,7 +299,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
} }
SRow *pRow = NULL; SRow *pRow = NULL;
if ((terrno = tRowBuild(pVals, (STSchema *)pTSchema, &pRow)) < 0) { if ((terrno = tRowBuild(pVals, (STSchema *)pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end; goto _end;
} }
taosArrayPush(tbData.aRowP, &pRow); taosArrayPush(tbData.aRowP, &pRow);
...@@ -309,7 +309,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * ...@@ -309,7 +309,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
} }
// encode // encode
tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno); tEncodeSize(tEncodeSubmitReq, pReq, len, terrno);
if (TSDB_CODE_SUCCESS == terrno) { if (TSDB_CODE_SUCCESS == terrno) {
SEncoder encoder; SEncoder encoder;
len += sizeof(SSubmitReq2Msg); len += sizeof(SSubmitReq2Msg);
...@@ -321,7 +321,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * ...@@ -321,7 +321,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len); ((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg *)pBuf)->version = htobe64(1); ((SSubmitReq2Msg *)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) { if (tEncodeSubmitReq(&encoder, pReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
/*vError("failed to encode submit req since %s", terrstr());*/ /*vError("failed to encode submit req since %s", terrstr());*/
} }
...@@ -332,7 +332,7 @@ _end: ...@@ -332,7 +332,7 @@ _end:
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
if (pReq) { if (pReq) {
tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }
......
...@@ -287,7 +287,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v ...@@ -287,7 +287,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
} }
int32_t tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg) { int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
STqHandle* pHandle = (STqHandle*) handle; STqHandle* pHandle = (STqHandle*) handle;
if(pHandle->msg == NULL){ if(pHandle->msg == NULL){
......
此差异已折叠。
...@@ -215,7 +215,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -215,7 +215,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
int64_t uid = pExec->pTqReader->lastBlkUid; int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*)); pSchemas = taosArrayInit(0, sizeof(void*));
continue; continue;
...@@ -274,7 +274,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -274,7 +274,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
int64_t uid = pExec->pTqReader->lastBlkUid; int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*)); pSchemas = taosArrayInit(0, sizeof(void*));
continue; continue;
......
...@@ -672,7 +672,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -672,7 +672,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
} }
SRow* pRow = NULL; SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, (STSchema*)pTSchema, &pRow)) < 0) { if ((terrno = tRowBuild(pVals, (STSchema*)pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end; goto _end;
} }
ASSERT(pRow); ASSERT(pRow);
...@@ -681,7 +681,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -681,7 +681,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SSubmitReq2 submitReq = {0}; SSubmitReq2 submitReq = {0};
if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end; goto _end;
} }
...@@ -690,28 +690,28 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -690,28 +690,28 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
// encode // encode
int32_t len; int32_t len;
int32_t code; int32_t code;
tEncodeSize(tEncodeSSubmitReq2, &submitReq, len, code); tEncodeSize(tEncodeSubmitReq, &submitReq, len, code);
SEncoder encoder; SEncoder encoder;
len += sizeof(SSubmitReq2Msg); len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len); pBuf = rpcMallocCont(len);
if (NULL == pBuf) { if (NULL == pBuf) {
tDestroySSubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
goto _end; goto _end;
} }
((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode); ((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode);
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg*)pBuf)->version = htobe64(1); ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSSubmitReq2(&encoder, &submitReq) < 0) { if (tEncodeSubmitReq(&encoder, &submitReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req since %s", terrstr()); tqError("failed to encode submit req since %s", terrstr());
tEncoderClear(&encoder); tEncoderClear(&encoder);
rpcFreeCont(pBuf); rpcFreeCont(pBuf);
tDestroySSubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
continue; continue;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
tDestroySSubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
SRpcMsg msg = { SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT, .msgType = TDMT_VND_SUBMIT,
......
...@@ -180,7 +180,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -180,7 +180,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
// lock // lock
taosWLockLatch(&pTq->lock); taosWLockLatch(&pTq->lock);
code = tqRegisterPushEntry(pTq, pHandle, pMsg); code = tqRegisterPushHandle(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp); tDeleteSMqDataRsp(&dataRsp);
return code; return code;
......
...@@ -1007,7 +1007,7 @@ static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) { ...@@ -1007,7 +1007,7 @@ static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) {
} }
tdSTSRowIterInit(&pCxt->rowIter, pCxt->pTbSchema); tdSTSRowIterInit(&pCxt->rowIter, pCxt->pTbSchema);
tDestroySSubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
if (NULL == pCxt->pTbData) { if (NULL == pCxt->pTbData) {
pCxt->pTbData = taosMemoryCalloc(1, sizeof(SSubmitTbData)); pCxt->pTbData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
if (NULL == pCxt->pTbData) { if (NULL == pCxt->pTbData) {
...@@ -1039,7 +1039,7 @@ static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) { ...@@ -1039,7 +1039,7 @@ static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) {
static void vnodeDestroySubmitReqConvertCxt(SSubmitReqConvertCxt *pCxt) { static void vnodeDestroySubmitReqConvertCxt(SSubmitReqConvertCxt *pCxt) {
taosMemoryFreeClear(pCxt->pTbSchema); taosMemoryFreeClear(pCxt->pTbSchema);
tDestroySSubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
taosMemoryFreeClear(pCxt->pTbData); taosMemoryFreeClear(pCxt->pTbData);
taosArrayDestroy(pCxt->pColValues); taosArrayDestroy(pCxt->pColValues);
} }
...@@ -1149,7 +1149,7 @@ static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) { ...@@ -1149,7 +1149,7 @@ static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
char *pMsg = NULL; char *pMsg = NULL;
uint32_t msglen = 0; uint32_t msglen = 0;
tEncodeSize(tEncodeSSubmitReq2, pSubmitReq, msglen, code); tEncodeSize(tEncodeSubmitReq, pSubmitReq, msglen, code);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pMsg = taosMemoryMalloc(msglen); pMsg = taosMemoryMalloc(msglen);
if (NULL == pMsg) { if (NULL == pMsg) {
...@@ -1159,7 +1159,7 @@ static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) { ...@@ -1159,7 +1159,7 @@ static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, pMsg, msglen); tEncoderInit(&encoder, pMsg, msglen);
code = tEncodeSSubmitReq2(&encoder, pSubmitReq); code = tEncodeSubmitReq(&encoder, pSubmitReq);
tEncoderClear(&encoder); tEncoderClear(&encoder);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -1199,7 +1199,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -1199,7 +1199,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
len -= sizeof(SSubmitReq2Msg); len -= sizeof(SSubmitReq2Msg);
SDecoder dc = {0}; SDecoder dc = {0};
tDecoderInit(&dc, pReq, len); tDecoderInit(&dc, pReq, len);
if (tDecodeSSubmitReq2(&dc, pSubmitReq) < 0) { if (tDecodeSubmitReq(&dc, pSubmitReq) < 0) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _exit; goto _exit;
} }
...@@ -1388,7 +1388,7 @@ _exit: ...@@ -1388,7 +1388,7 @@ _exit:
// clear // clear
taosArrayDestroy(newTbUids); taosArrayDestroy(newTbUids);
tDestroySSubmitReq(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE); tDestroySubmitReq(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE);
tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE); tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
if (code) terrno = code; if (code) terrno = code;
......
...@@ -126,7 +126,7 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int ...@@ -126,7 +126,7 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t len = 0; int32_t len = 0;
void* pBuf = NULL; void* pBuf = NULL;
tEncodeSize(tEncodeSSubmitReq2, pReq, len, code); tEncodeSize(tEncodeSubmitReq, pReq, len, code);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder; SEncoder encoder;
len += sizeof(SSubmitReq2Msg); len += sizeof(SSubmitReq2Msg);
...@@ -138,7 +138,7 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int ...@@ -138,7 +138,7 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg*)pBuf)->version = htobe64(1); ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
code = tEncodeSSubmitReq2(&encoder, pReq); code = tEncodeSubmitReq(&encoder, pReq);
tEncoderClear(&encoder); tEncoderClear(&encoder);
} }
...@@ -281,7 +281,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp ...@@ -281,7 +281,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
SRow* pRow = NULL; SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) { if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end; goto _end;
} }
taosArrayPush(tbData.aRowP, &pRow); taosArrayPush(tbData.aRowP, &pRow);
...@@ -301,7 +301,7 @@ _end: ...@@ -301,7 +301,7 @@ _end:
if (terrno != 0) { if (terrno != 0) {
*ppReq = NULL; *ppReq = NULL;
if (pReq) { if (pReq) {
tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }
return terrno; return terrno;
...@@ -326,7 +326,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32 ...@@ -326,7 +326,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32
code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, uid, vgId, suid); code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, uid, vgId, suid);
if (code) { if (code) {
if (pReq) { if (pReq) {
tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }
...@@ -335,7 +335,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32 ...@@ -335,7 +335,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32
} }
code = submitReqToMsg(vgId, pReq, pMsg, msgLen); code = submitReqToMsg(vgId, pReq, pMsg, msgLen);
tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
return code; return code;
......
...@@ -1052,19 +1052,6 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s ...@@ -1052,19 +1052,6 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if ((pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) || (pTaskInfo->streamInfo.submit.msgStr != NULL)) {
qError("qStreamSetScanMemData err:%d,%p", pTaskInfo->execModel, pTaskInfo->streamInfo.submit.msgStr);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
}
qDebug("set the submit block for future scan");
pTaskInfo->streamInfo.submit = submit;
return 0;
}
void qStreamSetOpen(qTaskInfo_t tinfo) { void qStreamSetOpen(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot; SOperatorInfo* pOperator = pTaskInfo->pRoot;
...@@ -1086,6 +1073,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1086,6 +1073,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (pOperator == NULL) { if (pOperator == NULL) {
return -1; return -1;
} }
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
STableScanInfo* pScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pScanInfo = pInfo->pTableScanOp->info;
STableScanBase* pScanBaseInfo = &pScanInfo->base; STableScanBase* pScanBaseInfo = &pScanInfo->base;
...@@ -1221,7 +1209,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1221,7 +1209,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema; pTaskInfo->streamInfo.schema = mtInfo.schema;
qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id); qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);
......
...@@ -109,8 +109,8 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand ...@@ -109,8 +109,8 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand
void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo) { void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo) {
taosMemoryFreeClear(pSchemaInfo->dbname); taosMemoryFreeClear(pSchemaInfo->dbname);
taosMemoryFreeClear(pSchemaInfo->tablename); taosMemoryFreeClear(pSchemaInfo->tablename);
tDeleteSSchemaWrapper(pSchemaInfo->sw); tDeleteSchemaWrapper(pSchemaInfo->sw);
tDeleteSSchemaWrapper(pSchemaInfo->qsw); tDeleteSchemaWrapper(pSchemaInfo->qsw);
} }
int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo) { int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo) {
...@@ -197,7 +197,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { ...@@ -197,7 +197,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
return pqSw; return pqSw;
} }
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); } static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSchemaWrapper(pStreamInfo->schema); }
static void freeBlock(void* pParam) { static void freeBlock(void* pParam) {
SSDataBlock* pBlock = *(SSDataBlock**)pParam; SSDataBlock* pBlock = *(SSDataBlock**)pParam;
......
...@@ -1623,7 +1623,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1623,7 +1623,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
pInfo->pRes->info.dataLoad = 1; pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
blockDataFreeRes((SSDataBlock*)pBlock); // blockDataFreeRes((SSDataBlock*)pBlock);
calBlockTbName(pInfo, pInfo->pRes); calBlockTbName(pInfo, pInfo->pRes);
return 0; return 0;
...@@ -1637,7 +1637,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1637,7 +1637,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug("start to exec queue scan, %s", id); qDebug("start to exec queue scan, %s", id);
if (pTaskInfo->streamInfo.submit.msgStr != NULL) { if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
if (pInfo->tqReader->msg2.msgStr == NULL) { if (pInfo->tqReader->msg.msgStr == NULL) {
SPackedData submit = pTaskInfo->streamInfo.submit; SPackedData submit = pTaskInfo->streamInfo.submit;
if (tqReaderSetSubmitMsg(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) { if (tqReaderSetSubmitMsg(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
qError("submit msg messed up when initing stream submit block %p", submit.msgStr); qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
...@@ -1663,7 +1663,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1663,7 +1663,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
} }
} }
pInfo->tqReader->msg2 = (SPackedData){0}; pInfo->tqReader->msg = (SPackedData){0};
pTaskInfo->streamInfo.submit = (SPackedData){0}; pTaskInfo->streamInfo.submit = (SPackedData){0};
return NULL; return NULL;
} }
...@@ -1689,17 +1689,17 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1689,17 +1689,17 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
while (1) { while (1) {
SSDataBlock block = {0}; int32_t type = tqNextBlockInWal(pInfo->tqReader);
int32_t type = tqNextBlock(pInfo->tqReader, &block); SSDataBlock* pRes = pInfo->tqReader->pResBlock;
// curVersion move to next, so currentOffset = curVersion - 1 // curVersion move to next, so currentOffset = curVersion - 1
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1); tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1);
if (type == FETCH_TYPE__DATA) { if (type == FETCH_TYPE__DATA) {
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, block.info.rows, qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version); pTaskInfo->streamInfo.currentOffset.version);
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
setBlockIntoRes(pInfo, &block, true); setBlockIntoRes(pInfo, pRes, true);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
qDebug("doQueueScan get data from log %" PRId64 " rows, return, version:%" PRId64, pInfo->pRes->info.rows, qDebug("doQueueScan get data from log %" PRId64 " rows, return, version:%" PRId64, pInfo->pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version); pTaskInfo->streamInfo.currentOffset.version);
...@@ -2055,7 +2055,7 @@ FETCH_NEXT_BLOCK: ...@@ -2055,7 +2055,7 @@ FETCH_NEXT_BLOCK:
NEXT_SUBMIT_BLK: NEXT_SUBMIT_BLK:
while (1) { while (1) {
if (pInfo->tqReader->msg2.msgStr == NULL) { if (pInfo->tqReader->msg.msgStr == NULL) {
if (pInfo->validBlockIndex >= totBlockNum) { if (pInfo->validBlockIndex >= totBlockNum) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
doClearBufferedBlocks(pInfo); doClearBufferedBlocks(pInfo);
...@@ -2191,7 +2191,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { ...@@ -2191,7 +2191,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid); qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
} }
qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType); qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
tDeleteSSchemaWrapper(mtInfo.schema); tDeleteSchemaWrapper(mtInfo.schema);
return NULL; return NULL;
} else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) { } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
SSnapContext* sContext = pInfo->sContext; SSnapContext* sContext = pInfo->sContext;
......
...@@ -313,7 +313,7 @@ void insDestroyTableDataCxt(STableDataCxt* pTableCxt) { ...@@ -313,7 +313,7 @@ void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
insDestroyBoundColInfo(&pTableCxt->boundColsInfo); insDestroyBoundColInfo(&pTableCxt->boundColsInfo);
taosArrayDestroyEx(pTableCxt->pValues, destroyColVal); taosArrayDestroyEx(pTableCxt->pValues, destroyColVal);
if (pTableCxt->pData) { if (pTableCxt->pData) {
tDestroySSubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pTableCxt->pData); taosMemoryFree(pTableCxt->pData);
} }
taosMemoryFree(pTableCxt); taosMemoryFree(pTableCxt);
...@@ -324,7 +324,7 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) { ...@@ -324,7 +324,7 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
return; return;
} }
tDestroySSubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pVgCxt->pData); taosMemoryFree(pVgCxt->pData);
taosMemoryFree(pVgCxt); taosMemoryFree(pVgCxt);
} }
...@@ -499,7 +499,7 @@ static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uin ...@@ -499,7 +499,7 @@ static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uin
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
uint32_t len = 0; uint32_t len = 0;
void* pBuf = NULL; void* pBuf = NULL;
tEncodeSize(tEncodeSSubmitReq2, pReq, len, code); tEncodeSize(tEncodeSubmitReq, pReq, len, code);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder; SEncoder encoder;
len += sizeof(SSubmitReq2Msg); len += sizeof(SSubmitReq2Msg);
...@@ -511,7 +511,7 @@ static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uin ...@@ -511,7 +511,7 @@ static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uin
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg*)pBuf)->version = htobe64(1); ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
code = tEncodeSSubmitReq2(&encoder, pReq); code = tEncodeSubmitReq(&encoder, pReq);
tEncoderClear(&encoder); tEncoderClear(&encoder);
} }
......
...@@ -193,7 +193,7 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -193,7 +193,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree); taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
if (pTask->outputType == TASK_OUTPUT__TABLE) { if (pTask->outputType == TASK_OUTPUT__TABLE) {
tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper); tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema); taosMemoryFree(pTask->tbSink.pTSchema);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册