提交 00587753 编写于 作者: H Hongze Cheng

feat: TD-15433

上级 65fe36ff
...@@ -252,21 +252,29 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); ...@@ -252,21 +252,29 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema); int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema);
typedef struct { typedef struct {
int32_t index; // index of failed block in submit blocks int8_t hashMeta;
int32_t vnode; // vnode index of failed block int64_t uid;
int32_t sid; // table index of failed block union {
int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table char* name;
} SSubmitRspBlock; const char* namec;
};
typedef struct { int32_t numOfRows;
int32_t code; // 0-success, > 0 error code int32_t affectedRows;
int32_t numOfRows; // number of records the client is trying to write } SSubmitBlkRsp;
int32_t affectedRows; // number of records actually written
int32_t failedRows; // number of failed records (exclude duplicate records) typedef struct {
int32_t numOfFailedBlocks; int32_t numOfRows;
SSubmitRspBlock failedBlocks[]; int32_t affectedRows;
int32_t nBlocks;
union {
SArray* pArray;
SSubmitBlkRsp* pBlocks;
};
} SSubmitRsp; } SSubmitRsp;
int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
#define SCHEMA_SMA_ON 0x1 #define SCHEMA_SMA_ON 0x1
#define SCHEMA_IDX_ON 0x2 #define SCHEMA_IDX_ON 0x2
typedef struct SSchema { typedef struct SSchema {
......
...@@ -4013,3 +4013,65 @@ int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) { ...@@ -4013,3 +4013,65 @@ int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) {
tEndDecode(pCoder); tEndDecode(pCoder);
return 0; return 0;
} }
static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBlock) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1;
if (pBlock->hashMeta) {
if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1;
if (tEncodeCStr(pEncoder, pBlock->name) < 0) return -1;
}
if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1;
if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1;
tEndEncode(pEncoder);
return 0;
}
static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1;
if (pBlock->hashMeta) {
if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1;
if (tDecodeCStr(pDecoder, &pBlock->namec) < 0) return -1;
}
if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1;
if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeSSubmitRsp(SEncoder *pEncoder, const SSubmitRsp *pRsp) {
int32_t nBlocks = taosArrayGetSize(pRsp->pArray);
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI32v(pEncoder, pRsp->numOfRows) < 0) return -1;
if (tEncodeI32v(pEncoder, pRsp->affectedRows) < 0) return -1;
if (tEncodeI32v(pEncoder, nBlocks) < 0) return -1;
for (int32_t iBlock = 0; iBlock < nBlocks; iBlock++) {
if (tEncodeSSubmitBlkRsp(pEncoder, (SSubmitBlkRsp *)taosArrayGet(pRsp->pArray, iBlock)) < 0) return -1;
}
tEndEncode(pEncoder);
return 0;
}
int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32v(pDecoder, &pRsp->numOfRows) < 0) return -1;
if (tDecodeI32v(pDecoder, &pRsp->affectedRows) < 0) return -1;
if (tDecodeI32v(pDecoder, &pRsp->nBlocks) < 0) return -1;
pRsp->pBlocks = tDecoderMalloc(pDecoder, sizeof(*pRsp->pBlocks) * pRsp->nBlocks);
if (pRsp->pBlocks == NULL) return -1;
for (int32_t iBlock = 0; iBlock < pRsp->nBlocks; iBlock++) {
if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1;
}
tEndDecode(pDecoder);
return 0;
}
...@@ -102,7 +102,7 @@ int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version ...@@ -102,7 +102,7 @@ int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version
int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg); int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg); int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, int32_t* pAffectedRows); int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
uint64_t taskId); uint64_t taskId);
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
......
...@@ -288,7 +288,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -288,7 +288,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
return 0; return 0;
} }
int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, int32_t *pAffectedRows) { int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) {
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
STsdbMemTable *pMemTable = pTsdb->mem; STsdbMemTable *pMemTable = pTsdb->mem;
void *tptr; void *tptr;
...@@ -342,7 +342,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo ...@@ -342,7 +342,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin; if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin;
if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax; if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax;
(*pAffectedRows) = pMsgIter->numOfRows; pRsp->numOfRows = pRsp->numOfRows;
pRsp->affectedRows = pRsp->affectedRows;
return 0; return 0;
} }
......
...@@ -36,9 +36,10 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * ...@@ -36,9 +36,10 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
// loop to insert // loop to insert
tInitSubmitMsgIter(pMsg, &msgIter); tInitSubmitMsgIter(pMsg, &msgIter);
while (true) { while (true) {
SSubmitBlkRsp r = {0};
tGetSubmitMsgNext(&msgIter, &pBlock); tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break; if (pBlock == NULL) break;
if (tsdbInsertTableData(pTsdb, &msgIter, pBlock, &affectedrows) < 0) { if (tsdbInsertTableData(pTsdb, &msgIter, pBlock, &r) < 0) {
return -1; return -1;
} }
...@@ -46,8 +47,8 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * ...@@ -46,8 +47,8 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
} }
if (pRsp != NULL) { if (pRsp != NULL) {
pRsp->affectedRows = affectedrows; // pRsp->affectedRows = affectedrows;
pRsp->numOfRows = numOfRows; // pRsp->numOfRows = numOfRows;
} }
return 0; return 0;
......
...@@ -497,7 +497,7 @@ _exit: ...@@ -497,7 +497,7 @@ _exit:
return 0; return 0;
} }
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char* tags) { static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
SMeta *pMeta = pVnode->pMeta; SMeta *pMeta = pVnode->pMeta;
...@@ -518,11 +518,11 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char ...@@ -518,11 +518,11 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char
taosMemoryFreeClear(pSchema); taosMemoryFreeClear(pSchema);
} }
pSchema = metaGetTbTSchema(pMeta, msgIter.suid, 0); // TODO: use the real schema pSchema = metaGetTbTSchema(pMeta, msgIter.suid, 0); // TODO: use the real schema
if(pSchema) { if (pSchema) {
suid = msgIter.suid; suid = msgIter.suid;
} }
} }
if(!pSchema) { if (!pSchema) {
printf("%s:%d no valid schema\n", tags, __LINE__); printf("%s:%d no valid schema\n", tags, __LINE__);
continue; continue;
} }
...@@ -540,12 +540,15 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char ...@@ -540,12 +540,15 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
SSubmitRsp submitRsp = {0};
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock; SSubmitBlk *pBlock;
SSubmitRsp rsp = {0}; SSubmitRsp rsp = {0};
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
SDecoder decoder = {0}; SDecoder decoder = {0};
int32_t nRows; int32_t nRows;
int32_t tsize, ret;
SEncoder encoder = {0};
pRsp->code = 0; pRsp->code = 0;
...@@ -559,12 +562,17 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -559,12 +562,17 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
goto _exit; goto _exit;
} }
for (;;) { submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp));
for (int i = 0;;) {
tGetSubmitMsgNext(&msgIter, &pBlock); tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break; if (pBlock == NULL) break;
SSubmitBlkRsp submitBlkRsp = {0};
// create table for auto create table mode // create table for auto create table mode
if (msgIter.schemaLen > 0) { if (msgIter.schemaLen > 0) {
submitBlkRsp.hashMeta = 1;
tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen); tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) { if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
pRsp->code = TSDB_CODE_INVALID_MSG; pRsp->code = TSDB_CODE_INVALID_MSG;
...@@ -580,6 +588,10 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -580,6 +588,10 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
} }
} }
submitBlkRsp.uid = createTbReq.uid;
submitBlkRsp.name = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
sprintf(submitBlkRsp.name, "%s.%s", pVnode->config.dbname, createTbReq.name);
msgIter.uid = createTbReq.uid; msgIter.uid = createTbReq.uid;
if (createTbReq.type == TSDB_CHILD_TABLE) { if (createTbReq.type == TSDB_CHILD_TABLE) {
msgIter.suid = createTbReq.ctb.suid; msgIter.suid = createTbReq.ctb.suid;
...@@ -590,20 +602,29 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -590,20 +602,29 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
tDecoderClear(&decoder); tDecoderClear(&decoder);
} }
if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &nRows) < 0) { if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) {
pRsp->code = terrno; pRsp->code = terrno;
goto _exit; goto _exit;
} }
rsp.affectedRows += nRows; submitRsp.numOfRows += submitBlkRsp.numOfRows;
submitRsp.affectedRows += submitBlkRsp.affectedRows;
taosArrayPush(submitRsp.pArray, &submitBlkRsp);
} }
_exit: _exit:
// encode the response (TODO) tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp)); pRsp->pCont = rpcMallocCont(tsize);
memcpy(pRsp->pCont, &rsp, sizeof(rsp)); pRsp->contLen = tsize;
pRsp->contLen = sizeof(SSubmitRsp); tEncoderInit(&encoder, pRsp->pCont, tsize);
tEncodeSSubmitRsp(&encoder, &submitRsp);
tEncoderClear(&encoder);
for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].name);
}
taosArrayDestroy(submitRsp.pArray);
tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
......
...@@ -1135,7 +1135,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -1135,7 +1135,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
case TDMT_VND_SUBMIT_RSP: { case TDMT_VND_SUBMIT_RSP: {
if (msg) { if (msg) {
SSubmitRsp *rsp = (SSubmitRsp *)msg; SSubmitRsp *rsp = (SSubmitRsp *)msg;
SCH_ERR_JRET(rsp->code); // SCH_ERR_JRET(rsp->code);
} }
SCH_ERR_JRET(rspCode); SCH_ERR_JRET(rspCode);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册