提交 246311d4 编写于 作者: S Shengliang Guan

minor changes

上级 7ce10a86
......@@ -201,18 +201,6 @@ typedef struct SSubmitBlk {
char data[];
} SSubmitBlk;
typedef struct {
/* data */
} SSubmitReq;
typedef struct {
/* data */
} SSubmitRsp;
typedef struct {
/* data */
} SSubmitReqReader;
// Submit message for this TSDB
typedef struct {
SMsgHead header;
......@@ -220,7 +208,7 @@ typedef struct {
int32_t length;
int32_t numOfBlocks;
char blocks[];
} SSubmitMsg;
} SSubmitReq;
typedef struct {
int32_t totalLen;
......@@ -234,7 +222,7 @@ typedef struct {
void* pMsg;
} SSubmitMsgIter;
int32_t tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter);
int32_t tInitSubmitMsgIter(SSubmitReq* pMsg, SSubmitMsgIter* pIter);
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
......@@ -244,16 +232,16 @@ typedef struct {
int32_t vnode; // vnode index of failed block
int32_t sid; // table index of failed block
int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table
} SShellSubmitRspBlock;
} SSubmitRspBlock;
typedef struct {
int32_t code; // 0-success, > 0 error code
int32_t numOfRows; // number of records the client is trying to write
int32_t affectedRows; // number of records actually written
int32_t failedRows; // number of failed records (exclude duplicate records)
int32_t numOfFailedBlocks;
SShellSubmitRspBlock failedBlocks[];
} SShellSubmitRsp;
int32_t code; // 0-success, > 0 error code
int32_t numOfRows; // number of records the client is trying to write
int32_t affectedRows; // number of records actually written
int32_t failedRows; // number of failed records (exclude duplicate records)
int32_t numOfFailedBlocks;
SSubmitRspBlock failedBlocks[];
} SSubmitRsp;
typedef struct SSchema {
int8_t type;
......@@ -888,7 +876,6 @@ typedef struct {
int8_t precision;
int8_t compressed;
int32_t compLen;
int32_t numOfRows;
char data[];
} SRetrieveTableRsp;
......
......@@ -135,7 +135,7 @@ typedef struct SVgDataBlocks {
SVgroupInfo vg;
int32_t numOfTables; // number of tables in current submit block
uint32_t size;
char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ...
char *pData; // SMsgDesc + SSubmitReq + SSubmitBlk + ...
} SVgDataBlocks;
typedef struct SVnodeModifOpStmtInfo {
......
......@@ -27,7 +27,7 @@
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
int32_t tInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
int32_t tInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
......@@ -36,7 +36,7 @@ int32_t tInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
pIter->totalLen = pMsg->length;
pIter->len = 0;
pIter->pMsg = pMsg;
if (pMsg->length <= sizeof(SSubmitMsg)) {
if (pMsg->length <= sizeof(SSubmitReq)) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
......@@ -46,7 +46,7 @@ int32_t tInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
if (pIter->len == 0) {
pIter->len += sizeof(SSubmitMsg);
pIter->len += sizeof(SSubmitReq);
} else {
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
......
......@@ -16,14 +16,11 @@
#define _DEFAULT_SOURCE
#include "tglobal.h"
#include "mndProfile.h"
//#include "mndConsumer.h"
#include "mndDb.h"
#include "mndStb.h"
#include "mndMnode.h"
#include "mndShow.h"
//#include "mndTopic.h"
#include "mndUser.h"
//#include "mndVgroup.h"
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18
......
......@@ -83,7 +83,7 @@ typedef struct {
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, STfs *pTfs);
void tsdbClose(STsdb *);
void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg, SSubmitRsp *pRsp);
int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int tsdbPrepareCommit(STsdb *pTsdb);
int tsdbCommit(STsdb *pTsdb);
......
......@@ -72,7 +72,7 @@ typedef struct {
int64_t ver;
int64_t tbUid;
SHashObj *tbIdHash;
const SSubmitMsg *pMsg;
const SSubmitReq *pMsg;
SSubmitBlk *pBlock;
SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter;
......@@ -225,7 +225,7 @@ static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const S
return 0;
}
void tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitMsg *pMsg, int64_t ver);
void tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
// return SArray<SColumnInfoData>
......
......@@ -54,7 +54,7 @@ typedef struct STsdbMemTable {
STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb);
void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable);
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRsp *pRsp);
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
......
......@@ -251,7 +251,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
}
pHead = pTopic->pReadhandle->pHead;
if (pHead->head.msgType == TDMT_VND_SUBMIT) {
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body;
qTaskInfo_t task = pTopic->buffer.output[pos].task;
qSetStreamInput(task, pCont);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
......@@ -397,7 +397,7 @@ int32_t tqProcessConsumeReqV0(STQ* pTq, SRpcMsg* pMsg) {
fetchOffset++;
}
if (skip == 1) continue;
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body;
qTaskInfo_t task = pTopic->buffer.output[pos].task;
printf("current fetch offset %ld\n", fetchOffset);
......
......@@ -31,7 +31,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
return pReadHandle;
}
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
pReadHandle->pMsg = pMsg;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
......
......@@ -15,7 +15,7 @@
#include "tsdbDef.h"
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg);
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
static int tsdbMemTableInsertTbData(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *pAffectedRows);
static STbData *tsdbNewTbData(tb_uid_t uid);
static void tsdbFreeTbData(STbData *pTbData);
......@@ -73,7 +73,7 @@ void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable) {
}
}
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRsp *pRsp) {
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
SSubmitBlk * pBlock = NULL;
SSubmitMsgIter msgIter = {0};
int32_t affectedrows = 0, numOfRows = 0;
......@@ -227,7 +227,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
return 0;
}
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg) {
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
ASSERT(pMsg != NULL);
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
SSubmitMsgIter msgIter = {0};
......@@ -455,7 +455,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
/* ------------------------ REFACTORING ------------------------ */
#if 0
int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitMsg *pMsg) {
int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitReq *pMsg) {
SMemAllocator *pMA = pMemTable->pMA;
STbData * pTbData = (STbData *)TD_MA_MALLOC(pMA, sizeof(*pTbData));
if (pTbData == NULL) {
......@@ -496,9 +496,9 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow* row);
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
static STSRow* tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitReq *pMsg);
static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows);
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
static int tsdbInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter);
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, STSRow* row);
......
......@@ -15,7 +15,7 @@
#include "tsdbDef.h"
int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg, SSubmitRsp *pRsp) {
int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
// Check if mem is there. If not, create one.
if (pTsdb->mem == NULL) {
pTsdb->mem = tsdbNewMemTable(pTsdb);
......
......@@ -109,7 +109,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// }
break;
case TDMT_VND_SUBMIT:
if (tsdbInsertData(pVnode->pTsdb, (SSubmitMsg *)ptr, NULL) < 0) {
if (tsdbInsertData(pVnode->pTsdb, (SSubmitReq *)ptr, NULL) < 0) {
// TODO: handle error
}
break;
......
......@@ -417,7 +417,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB
}
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks) {
const int INSERT_HEAD_SIZE = sizeof(SSubmitMsg);
const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
int code = 0;
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
......
......@@ -121,7 +121,7 @@ static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pS
}
static void buildMsgHeader(SVgDataBlocks* blocks) {
SSubmitMsg* submit = (SSubmitMsg*)blocks->pData;
SSubmitReq* submit = (SSubmitReq*)blocks->pData;
submit->header.vgId = htonl(blocks->vg.vgId);
submit->header.contLen = htonl(blocks->size);
submit->length = submit->header.contLen;
......
......@@ -70,7 +70,7 @@ protected:
for (size_t i = 0; i < num; ++i) {
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i);
cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl;
SSubmitMsg* submit = (SSubmitMsg*)vg->pData;
SSubmitReq* submit = (SSubmitReq*)vg->pData;
cout << "length:" << ntohl(submit->length) << ", numOfBlocks:" << ntohl(submit->numOfBlocks) << endl;
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
......@@ -93,7 +93,7 @@ protected:
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i);
ASSERT_EQ(vg->numOfTables, numOfTables);
ASSERT_GE(vg->size, 0);
SSubmitMsg* submit = (SSubmitMsg*)vg->pData;
SSubmitReq* submit = (SSubmitReq*)vg->pData;
ASSERT_GE(ntohl(submit->length), 0);
ASSERT_GE(ntohl(submit->numOfBlocks), 0);
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
......
......@@ -823,7 +823,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
}
SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg;
SSubmitRsp *rsp = (SSubmitRsp *)msg;
if (rsp) {
pJob->resNumOfRows += rsp->affectedRows;
}
......
......@@ -273,7 +273,7 @@ void *schtSendRsp(void *param) {
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SShellSubmitRsp rsp = {0};
SSubmitRsp rsp = {0};
rsp.affectedRows = 10;
schHandleResponseMsg(job, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册