diff --git a/include/client/taos.h b/include/client/taos.h index 66e3c491c775791d6a67129f0bd402f5aa1aa898..a102629efa425ca71143669f18091eef073fe414 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -31,27 +31,27 @@ typedef void TAOS_SUB; typedef void **TAOS_ROW; // Data type definition -#define TSDB_DATA_TYPE_NULL 0 // 1 bytes -#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes -#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte -#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes -#define TSDB_DATA_TYPE_INT 4 // 4 bytes -#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes -#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes -#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes -#define TSDB_DATA_TYPE_VARCHAR 8 // string, alias for varchar -#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes -#define TSDB_DATA_TYPE_NCHAR 10 // unicode string -#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte -#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes -#define TSDB_DATA_TYPE_UINT 13 // 4 bytes -#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes -#define TSDB_DATA_TYPE_JSON 15 // json string -#define TSDB_DATA_TYPE_VARBINARY 16 // binary -#define TSDB_DATA_TYPE_DECIMAL 17 // decimal -#define TSDB_DATA_TYPE_BLOB 18 // binary +#define TSDB_DATA_TYPE_NULL 0 // 1 bytes +#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes +#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte +#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes +#define TSDB_DATA_TYPE_INT 4 // 4 bytes +#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes +#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes +#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes +#define TSDB_DATA_TYPE_VARCHAR 8 // string, alias for varchar +#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes +#define TSDB_DATA_TYPE_NCHAR 10 // unicode string +#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte +#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes +#define TSDB_DATA_TYPE_UINT 13 // 4 bytes +#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes +#define TSDB_DATA_TYPE_JSON 15 // json string +#define TSDB_DATA_TYPE_VARBINARY 16 // binary +#define TSDB_DATA_TYPE_DECIMAL 17 // decimal +#define TSDB_DATA_TYPE_BLOB 18 // binary #define TSDB_DATA_TYPE_MEDIUMBLOB 19 -#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string +#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string typedef enum { TSDB_OPTION_LOCALE, @@ -257,9 +257,15 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co void tmqShowMsg(tmq_message_t *tmq_message); int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message); -typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code); +/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ -DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT* stmt); +DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message); +DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message); + +/* ---------------------- OTHER ---------------------------- */ +typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code); + +DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); #ifdef __cplusplus } diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 3ca120981835cbec20d971aa3ca2e5fbad549dad..385c123fec8f87c7c0785916139d2082e273f33c 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -62,18 +62,17 @@ typedef struct SDataBlockInfo { union {int64_t uid; int64_t blockId;}; } SDataBlockInfo; -typedef struct SConstantItem { - SColumnInfo info; - int32_t startRow; // run-length-encoding to save the space for multiple rows - int32_t endRow; - SVariant value; -} SConstantItem; +//typedef struct SConstantItem { +// SColumnInfo info; +// int32_t startRow; // run-length-encoding to save the space for multiple rows +// int32_t endRow; +// SVariant value; +//} SConstantItem; // info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); typedef struct SSDataBlock { SColumnDataAgg *pBlockAgg; SArray *pDataBlock; // SArray - SArray *pConstantList; // SArray, it is a constant/tags value of the corresponding result value. SDataBlockInfo info; } SSDataBlock; @@ -95,66 +94,15 @@ typedef struct SColumnInfoData { }; } SColumnInfoData; -static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { - int64_t tbUid = pBlock->info.uid; - int16_t numOfCols = pBlock->info.numOfCols; - int16_t hasVarCol = pBlock->info.hasVarCol; - int32_t rows = pBlock->info.rows; - int32_t sz = taosArrayGetSize(pBlock->pDataBlock); - - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, tbUid); - tlen += taosEncodeFixedI16(buf, numOfCols); - tlen += taosEncodeFixedI16(buf, hasVarCol); - tlen += taosEncodeFixedI32(buf, rows); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); - tlen += taosEncodeFixedI16(buf, pColData->info.colId); - tlen += taosEncodeFixedI16(buf, pColData->info.type); - tlen += taosEncodeFixedI32(buf, pColData->info.bytes); - int32_t colSz = rows * pColData->info.bytes; - tlen += taosEncodeBinary(buf, pColData->pData, colSz); - } - return tlen; -} - -static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { - int32_t sz; - - buf = taosDecodeFixedI64(buf, &pBlock->info.uid); - buf = taosDecodeFixedI16(buf, &pBlock->info.numOfCols); - buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol); - buf = taosDecodeFixedI32(buf, &pBlock->info.rows); - buf = taosDecodeFixedI32(buf, &sz); - pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData)); - for (int32_t i = 0; i < sz; i++) { - SColumnInfoData data = {0}; - buf = taosDecodeFixedI16(buf, &data.info.colId); - buf = taosDecodeFixedI16(buf, &data.info.type); - buf = taosDecodeFixedI32(buf, &data.info.bytes); - int32_t colSz = pBlock->info.rows * data.info.bytes; - buf = taosDecodeBinary(buf, (void**)&data.pData, colSz); - taosArrayPush(pBlock->pDataBlock, &data); - } - return (void*)buf; -} +void* blockDataDestroy(SSDataBlock* pBlock); +int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); +void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock); static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { if (pBlock == NULL) { return; } - - // int32_t numOfOutput = pBlock->info.numOfCols; - int32_t sz = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < sz; ++i) { - SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); - tfree(pColInfoData->pData); - } - - taosArrayDestroy(pBlock->pDataBlock); - tfree(pBlock->pBlockAgg); - // tfree(pBlock); + blockDataDestroy(pBlock); } static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) { diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 7e60013aa1911371b648229cdfd8e6cbfe3c2dc5..17e019d9775fb5714a95490db44937b3706736bc 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -52,6 +52,21 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \ } while (0) +static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) { + if (!pColumnInfoData->hasNull) { + return false; + } + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + return pColumnInfoData->varmeta.offset[row] == -1; + } else { + if (pColumnInfoData->nullbitmap == NULL) { + return false; + } + + return colDataIsNull_f(pColumnInfoData->nullbitmap, row); + } +} + static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row, SColumnDataAgg* pColAgg) { if (!pColumnInfoData->hasNull) { @@ -79,10 +94,10 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u } } -#define BitmapLen(_n) (((_n) + ((1<> NBIT) - +#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT) -#define colDataGetData(p1_, r_) \ +// SColumnInfoData, rowNumber +#define colDataGetData(p1_, r_) \ ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \ : ((p1_)->pData + ((r_) * (p1_)->info.bytes))) @@ -126,4 +141,4 @@ void* blockDataDestroy(SSDataBlock* pBlock); } #endif -#endif /*_TD_COMMON_EP_H_*/ +#endif /*_TD_COMMON_EP_H_*/ diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d19193d764d8ce78ab07343609ed433ab016b179..32271ed15793ed3055e332160b53da30e46b9876 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -420,7 +420,7 @@ typedef struct { }; } SColumnFilterList; /* - * for client side struct, we only need the column id, type, bytes are not necessary + * for client side struct, only column id, type, bytes are necessary * But for data in vnode side, we need all the following information. */ typedef struct { @@ -2159,13 +2159,36 @@ typedef struct { SArray* topics; // SArray } SMqCMGetSubEpRsp; -struct tmq_message_t { +typedef struct { SMqRspHead head; union { SMqPollRsp consumeRsp; SMqCMGetSubEpRsp getEpRsp; }; void* extra; +} SMqMsgWrapper; + +typedef struct { + int32_t curBlock; + int32_t curRow; + void** uData; +} SMqRowIter; + +struct tmq_message_t_v1 { + SMqPollRsp rsp; + SMqRowIter iter; +}; + +struct tmq_message_t { + SMqRspHead head; + union { + SMqPollRsp consumeRsp; + SMqCMGetSubEpRsp getEpRsp; + }; + void* extra; + int32_t curBlock; + int32_t curRow; + void** uData; }; static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 6a07887721bd222338a6a1292229c6fbfa2d11bd..a596794b3d9ac493f775bd2d6f6bb74f12075036 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -189,6 +189,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index c8df40aedcaa2333f7153cebb94954e4d0e63587..b6f42eeee0ed23a98d595e03c786d9103feb6628 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -49,6 +49,10 @@ typedef struct STableComInfo { int32_t rowSize; // row size of the schema } STableComInfo; +typedef struct SIndexMeta { + +} SIndexMeta; + /* * ASSERT(sizeof(SCTableMeta) == 24) * ASSERT(tableType == TSDB_CHILD_TABLE) diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h index 1b41da33bbbcfa60567837c9a886533b42062b54..1dc154ce484e1a923852f877bdae4ec37d62b1b4 100644 --- a/include/libs/tfs/tfs.h +++ b/include/libs/tfs/tfs.h @@ -198,6 +198,16 @@ void tfsBasename(const STfsFile *pFile, char *dest); */ void tfsDirname(const STfsFile *pFile, char *dest); +/** + * @brief Get the absolute file name of rname. + * + * @param pTfs + * @param diskId + * @param rname relative file name + * @param aname absolute file name + */ +void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname); + /** * @brief Remove file in tfs. * diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 0ddd53be0838bf7c633a43b6849470b35f4c8be5..ee13527b968c5d648cbe83e4f57b7e72c71d7eed 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -700,6 +700,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); + pRsp->curBlock = 0; + pRsp->curRow = 0; + // TODO: alloc mem + /*pRsp->*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ if (pRsp->consumeRsp.numOfTopics == 0) { /*printf("no data\n");*/ @@ -758,9 +762,9 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { goto END; } - // tmq's epoch is monotomically increase, + // tmq's epoch is monotonically increase, // so it's safe to discard any old epoch msg. - // epoch will only increase when received newer epoch ep msg + // Epoch will only increase when received newer epoch ep msg SMqRspHead* head = pMsg->pData; int32_t epoch = atomic_load_32(&tmq->epoch); if (head->epoch <= epoch) { @@ -1282,6 +1286,34 @@ const char* tmq_err2str(tmq_resp_err_t err) { return "fail"; } +TAOS_ROW tmq_get_row(tmq_message_t* message) { + SMqPollRsp* rsp = &message->consumeRsp; + while (1) { + if (message->curBlock < taosArrayGetSize(rsp->pBlockData)) { + SSDataBlock* pBlock = taosArrayGet(rsp->pBlockData, message->curBlock); + if (message->curRow < pBlock->info.rows) { + for (int i = 0; i < pBlock->info.numOfCols; i++) { + SColumnInfoData* pData = taosArrayGet(pBlock->pDataBlock, i); + if (colDataIsNull_s(pData, message->curRow)) + message->uData[i] = NULL; + else { + message->uData[i] = colDataGetData(pData, message->curRow); + } + } + message->curRow++; + return message->uData; + } else { + message->curBlock++; + message->curRow = 0; + continue; + } + } + return NULL; + } +} + +char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; } + #if 0 tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { tmq_t* pTmq = malloc(sizeof(tmq_t)); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0d1f762db6e226fbbf32df16a4399d281aa82c1e..4b649d5f62e7139929d5aac87e20b3f0914d290f 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -240,10 +240,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co } size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { - ASSERT(pBlock); - - size_t constantCols = (pBlock->pConstantList != NULL)? taosArrayGetSize(pBlock->pConstantList):0; - ASSERT( pBlock->info.numOfCols == taosArrayGetSize(pBlock->pDataBlock) + constantCols); + ASSERT(pBlock && pBlock->info.numOfCols == taosArrayGetSize(pBlock->pDataBlock)); return pBlock->info.numOfCols; } @@ -1166,3 +1163,67 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock)); } + +int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { + int64_t tbUid = pBlock->info.uid; + int16_t numOfCols = pBlock->info.numOfCols; + int16_t hasVarCol = pBlock->info.hasVarCol; + int32_t rows = pBlock->info.rows; + int32_t sz = taosArrayGetSize(pBlock->pDataBlock); + + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, tbUid); + tlen += taosEncodeFixedI16(buf, numOfCols); + tlen += taosEncodeFixedI16(buf, hasVarCol); + tlen += taosEncodeFixedI32(buf, rows); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); + tlen += taosEncodeFixedI16(buf, pColData->info.colId); + tlen += taosEncodeFixedI16(buf, pColData->info.type); + tlen += taosEncodeFixedI32(buf, pColData->info.bytes); + + if (IS_VAR_DATA_TYPE(pColData->info.type)) { + tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows); + } else { + tlen += taosEncodeBinary(buf, pColData->nullbitmap, BitmapLen(rows)); + } + + int32_t len = colDataGetLength(pColData, rows); + taosEncodeFixedI32(buf, len); + + tlen += taosEncodeBinary(buf, pColData->pData, len); + } + return tlen; +} + +void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { + int32_t sz; + + buf = taosDecodeFixedI64(buf, &pBlock->info.uid); + buf = taosDecodeFixedI16(buf, &pBlock->info.numOfCols); + buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol); + buf = taosDecodeFixedI32(buf, &pBlock->info.rows); + buf = taosDecodeFixedI32(buf, &sz); + pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData)); + for (int32_t i = 0; i < sz; i++) { + SColumnInfoData data = {0}; + buf = taosDecodeFixedI16(buf, &data.info.colId); + buf = taosDecodeFixedI16(buf, &data.info.type); + buf = taosDecodeFixedI32(buf, &data.info.bytes); + + if (IS_VAR_DATA_TYPE(data.info.type)) { + buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t)); + data.varmeta.length = pBlock->info.rows * sizeof(int32_t); + data.varmeta.allocLen = data.varmeta.length; + } else { + buf = taosDecodeBinary(buf, (void**)&data.nullbitmap, BitmapLen(pBlock->info.rows)); + } + + int32_t len = 0; + buf = taosDecodeFixedI32(buf, &len); + buf = taosDecodeBinary(buf, (void**)&data.pData, len); + taosArrayPush(pBlock->pDataBlock, &data); + } + return (void*)buf; +} \ No newline at end of file diff --git a/source/dnode/mgmt/daemon/src/dmnMain.c b/source/dnode/mgmt/daemon/src/dmnMain.c index 7ba272453ca247600024411c106af17a1dc6cd25..3d4de18dcb20ddb47faf7cd6dda1c7f8557166cb 100644 --- a/source/dnode/mgmt/daemon/src/dmnMain.c +++ b/source/dnode/mgmt/daemon/src/dmnMain.c @@ -27,7 +27,7 @@ static struct { } dmn = {0}; static void dmnSigintHandle(int signum, void *info, void *ctx) { - uInfo("singal:%d is received", signum); + uInfo("signal:%d is received", signum); dmn.stop = true; } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index cc5bf843ce22164c051950b4c6e25780fcdc3fe7..f81dead3259c3e43d0287829dfcd33be6ba2fad1 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -27,6 +27,22 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; + // TODO encode tasks + if (pObj->tasks) { + int32_t sz = taosArrayGetSize(pObj->tasks); + tEncodeI32(pEncoder, sz); + for (int32_t i = 0; i < sz; i++) { + SArray *pArray = taosArrayGet(pObj->tasks, i); + int32_t innerSz = taosArrayGetSize(pArray); + tEncodeI32(pEncoder, innerSz); + for (int32_t j = 0; j < innerSz; j++) { + SStreamTask *pTask = taosArrayGet(pArray, j); + tEncodeSStreamTask(pEncoder, pTask); + } + } + } else { + tEncodeI32(pEncoder, 0); + } return pEncoder->pos; } @@ -42,5 +58,23 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; + int32_t sz; + if (tDecodeI32(pDecoder, &sz) < 0) return -1; + if (sz != 0) { + pObj->tasks = taosArrayInit(sz, sizeof(SArray)); + for (int32_t i = 0; i < sz; i++) { + int32_t innerSz; + if (tDecodeI32(pDecoder, &innerSz) < 0) return -1; + SArray *pArray = taosArrayInit(innerSz, sizeof(SStreamTask)); + for (int32_t j = 0; j < innerSz; j++) { + SStreamTask task; + if (tDecodeSStreamTask(pDecoder, &task) < 0) return -1; + taosArrayPush(pArray, &task); + } + taosArrayPush(pObj->tasks, pArray); + } + } else { + pObj->tasks = NULL; + } return 0; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 16b1ba8a5c9cc86db1735e7a87da81c969eda4cf..faef757e7625b14d69c395284c5e7a5921711c97 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -77,6 +77,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { return -1; } taosArrayPush(taskOneLevel, pTask); + + SCoder encoder; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); + tEncodeSStreamTask(&encoder, pTask); + int32_t tlen = sizeof(SMsgHead) + encoder.pos; + tCoderClear(&encoder); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + ((SMsgHead*)buf)->streamTaskId = pTask->taskId; + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER); + tEncodeSStreamTask(&encoder, pTask); + tCoderClear(&encoder); + + STransAction action = {0}; + action.epSet = plan->execNode.epSet; + action.pCont = buf; + action.contLen = tlen; + action.msgType = TDMT_VND_TASK_DEPLOY; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + rpcFreeCont(buf); + return -1; + } } } else if (plan->subplanType == SUBPLAN_TYPE_SCAN) { // duplicatable @@ -101,6 +127,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } taosArrayPush(taskOneLevel, pTask); + + SCoder encoder; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); + tEncodeSStreamTask(&encoder, pTask); + int32_t tlen = sizeof(SMsgHead) + encoder.pos; + tCoderClear(&encoder); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + ((SMsgHead*)buf)->streamTaskId = pTask->taskId; + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER); + tEncodeSStreamTask(&encoder, pTask); + tCoderClear(&encoder); + + STransAction action = {0}; + action.epSet = plan->execNode.epSet; + action.pCont = buf; + action.contLen = tlen; + action.msgType = TDMT_SND_TASK_DEPLOY; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + rpcFreeCont(buf); + return -1; + } } } else { // not duplicatable @@ -117,6 +169,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { return -1; } taosArrayPush(taskOneLevel, pTask); + + SCoder encoder; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); + tEncodeSStreamTask(&encoder, pTask); + int32_t tlen = sizeof(SMsgHead) + encoder.pos; + tCoderClear(&encoder); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + ((SMsgHead*)buf)->streamTaskId = pTask->taskId; + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER); + tEncodeSStreamTask(&encoder, pTask); + tCoderClear(&encoder); + + STransAction action = {0}; + action.epSet = plan->execNode.epSet; + action.pCont = buf; + action.contLen = tlen; + action.msgType = TDMT_SND_TASK_DEPLOY; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + rpcFreeCont(buf); + return -1; + } } taosArrayPush(pStream->tasks, taskOneLevel); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 67011dfe8a3979e315eb234a0223ee245dcb9ea0..af0f354c11dc668353582e9a9cf862165df25e10 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -230,19 +230,19 @@ static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamR } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); - SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); + if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) { + mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr()); mndTransDrop(pTrans); return -1; } - sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); - if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) { - mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr()); + SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { + mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; } + sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index b4d45e83fb340202d39b5674dfbcf1d74b37179f..8d8ed2e427087798c7e6da03be47743cec526bfa 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -55,6 +55,7 @@ int tqCommit(STQ*); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg); +int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index fc6a3699d555774baa6366aad67b70653bc46411..9b21cf92ce83a161e56031cf8569f1337f3a180c 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -161,6 +161,7 @@ struct STQ { STqMemRef tqMemRef; STqMetaStore* tqMeta; STqPushMgr* tqPushMgr; + SHashObj* pStreamTasks; SWal* pWal; SMeta* pVnodeMeta; }; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d15481b4aa54c6c5009f0030eca63b0685a749ec..8cdc250e8d5bbb9cbaac311a38ac2001144572a1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -55,6 +55,8 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, S return NULL; } + pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + return pTq; } @@ -416,3 +418,18 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { terrno = TSDB_CODE_SUCCESS; return 0; } + +int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { + SStreamTask* pTask = malloc(sizeof(SStreamTask)); + if (pTask == NULL) { + return -1; + } + SCoder decoder; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER); + tDecodeSStreamTask(&decoder, pTask); + tCoderClear(&decoder); + + taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); + + return 0; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 7a5c9c5c1e5886131eb59a997e7f1116834133f3..42bfebc77bbcfc5ff0765af252093a226691437f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -20,8 +20,7 @@ static const char *TSDB_SMA_DNAME[] = { "tsma", // TSDB_SMA_TYPE_TIME_RANGE "rsma", // TSDB_SMA_TYPE_ROLLUP }; -#define SMA_CHECK_HASH -#undef SMA_PRINT_DEBUG_LOG +#undef _TEST_SMA_PRINT_DEBUG_LOG_ #define SMA_STORAGE_TSDB_DAYS 30 #define SMA_STORAGE_TSDB_TIMES 10 #define SMA_STORAGE_SPLIT_HOURS 24 @@ -33,8 +32,8 @@ static const char *TSDB_SMA_DNAME[] = { #define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test #define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test typedef enum { - SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2t200.dat - SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/rsma/sma_index_uid/v2r200.dat + SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f200.tsma + SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma } ESmaStorageLevel; typedef struct { @@ -47,6 +46,7 @@ typedef struct { int32_t iter; int32_t fid; } SmaFsIter; + typedef struct { STsdb * pTsdb; SDBFile dFile; @@ -71,11 +71,12 @@ typedef struct { } SSmaStatItem; struct SSmaStat { - SHashObj *smaStatItems; // key: indexName, value: SSmaStatItem + SHashObj *smaStatItems; // key: indexUid, value: SSmaStatItem T_REF_DECLARE() }; // declaration of static functions + // expired window static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); @@ -159,22 +160,12 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) { return TSDB_CODE_SUCCESS; } - if (tsdbLockRepo(pTsdb) != 0) { - return TSDB_CODE_FAILED; - } - - if (*pEnv == NULL) { // 2nd phase check + if (*pEnv == NULL) { if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) { - tsdbUnlockRepo(pTsdb); return TSDB_CODE_FAILED; } } - if (tsdbUnlockRepo(pTsdb) != 0) { - *pEnv = tsdbFreeSmaEnv(*pEnv); - return TSDB_CODE_FAILED; - } - return TSDB_CODE_SUCCESS; } @@ -222,11 +213,14 @@ static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) { return TSDB_CODE_SUCCESS; } - // TODO: lock. lazy mode when update expired window, or hungry mode during tsdbNew. + /** + * 1. Lazy mode utilized when init SSmaStat to update expired window(or hungry mode when tsdbNew). + * 2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if + * tsdbInitSmaStat invoked in other multithread environment later. + */ if (*pSmaStat == NULL) { *pSmaStat = (SSmaStat *)calloc(1, sizeof(SSmaStat)); if (*pSmaStat == NULL) { - // TODO: unlock terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } @@ -236,11 +230,9 @@ static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) { if ((*pSmaStat)->smaStatItems == NULL) { tfree(*pSmaStat); - // TODO: unlock return TSDB_CODE_FAILED; } } - // TODO: unlock return TSDB_CODE_SUCCESS; } @@ -279,14 +271,17 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { } static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { + SSmaEnv *pEnv = NULL; + + // return if already init switch (smaType) { case TSDB_SMA_TYPE_TIME_RANGE: - if (pTsdb->pTSmaEnv) { + if ((pEnv = (SSmaEnv *)atomic_load_ptr(&pTsdb->pTSmaEnv)) != NULL) { return TSDB_CODE_SUCCESS; } break; case TSDB_SMA_TYPE_ROLLUP: - if (pTsdb->pRSmaEnv) { + if ((pEnv = (SSmaEnv *)atomic_load_ptr(&pTsdb->pRSmaEnv)) != NULL) { return TSDB_CODE_SUCCESS; } break; @@ -295,18 +290,38 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { return TSDB_CODE_FAILED; } - // SDiskID did = {0}; - SSmaEnv *pEnv = NULL; - char smaPath[TSDB_FILENAME_LEN] = "/proj/.sma/"; - if (tsdbInitSmaEnv(pTsdb, smaPath, &pEnv) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_FAILED; - } + // init sma env + tsdbLockRepo(pTsdb); + if (pTsdb->pTSmaEnv == NULL) { + char rname[TSDB_FILENAME_LEN] = {0}; + char aname[TSDB_FILENAME_LEN * 2 + 32] = {0}; // TODO: make TMPNAME_LEN public as TSDB_FILENAME_LEN? - if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { - pTsdb->pTSmaEnv = pEnv; - } else { - pTsdb->pRSmaEnv = pEnv; + SDiskID did = {0}; + tfsAllocDisk(pTsdb->pTfs, TFS_PRIMARY_LEVEL, &did); + if (did.level < 0 || did.id < 0) { + tsdbUnlockRepo(pTsdb); + return TSDB_CODE_FAILED; + } + tsdbGetSmaDir(REPO_ID(pTsdb), smaType, rname); + tfsAbsoluteName(pTsdb->pTfs, did, rname, aname); + + if (tfsMkdirRecurAt(pTsdb->pTfs, rname, did) != TSDB_CODE_SUCCESS) { + tsdbUnlockRepo(pTsdb); + return TSDB_CODE_FAILED; + } + + if (tsdbInitSmaEnv(pTsdb, aname, &pEnv) != TSDB_CODE_SUCCESS) { + tsdbUnlockRepo(pTsdb); + return TSDB_CODE_FAILED; + } + + if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { + atomic_store_ptr(&pTsdb->pTSmaEnv, pEnv); + } else { + atomic_store_ptr(&pTsdb->pRSmaEnv, pEnv); + } } + tsdbUnlockRepo(pTsdb); return TSDB_CODE_SUCCESS; }; @@ -379,7 +394,7 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) { int8_t state = TSDB_SMA_STAT_EXPIRED; for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { - if (taosHashPut(pItem->expiredWindows, (void *)(expiredWindows + i), sizeof(TSKEY), &state, sizeof(state)) != 0) { + if (taosHashPut(pItem->expiredWindows, expiredWindows + i, sizeof(TSKEY), &state, sizeof(state)) != 0) { // If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would // tell query module to query raw TS data. // N.B. @@ -497,12 +512,12 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t k return TSDB_CODE_FAILED; } -#ifdef SMA_PRINT_DEBUG_LOG +#ifdef _TEST_SMA_PRINT_DEBUG_LOG_ uint32_t valueSize = 0; void * data = tsdbGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize); ASSERT(data != NULL); for (uint32_t v = 0; v < valueSize; v += 8) { - tsdbWarn("vgId:%d sma data - val[%d] is %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v)); + tsdbWarn("vgId:%d insert sma data val[%d] %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v)); } #endif return TSDB_CODE_SUCCESS; @@ -935,12 +950,14 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_ tsdbCloseDBF(&tReadH.dFile); return TSDB_CODE_FAILED; } - tfree(result); -#ifdef SMA_PRINT_DEBUG_LOG + +#ifdef _TEST_SMA_PRINT_DEBUG_LOG_ for (uint32_t v = 0; v < valueSize; v += 8) { - tsdbWarn("vgId:%d v[%d]=%" PRIi64, REPO_ID(pTsdb), v, *(int64_t *)POINTER_SHIFT(result, v)); + tsdbWarn("vgId:%d get sma data v[%d]=%" PRIi64, REPO_ID(pTsdb), v, *(int64_t *)POINTER_SHIFT(result, v)); } #endif + tfree(result); // TODO: fill the result to output + #if 0 int32_t nResult = 0; int64_t lastKey = 0; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index aba8100478e973bb84deb22beac5038f243f95f2..218b53c2abe42a8d0f197fc11db395339d68f9a5 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -41,7 +41,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { return 0; } -int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { +int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { void *ptr = NULL; if (pVnode->config.streamMode == 0) { @@ -63,7 +63,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { switch (pMsg->msgType) { case TDMT_VND_CREATE_STB: { - SVCreateTbReq vCreateTbReq = {0}; + SVCreateTbReq vCreateTbReq = {0}; tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { // TODO: handle error @@ -100,7 +100,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { break; } case TDMT_VND_ALTER_STB: { - SVCreateTbReq vAlterTbReq = {0}; + SVCreateTbReq vAlterTbReq = {0}; vTrace("vgId:%d, process alter stb req", pVnode->vgId); tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq); free(vAlterTbReq.stbCfg.pSchema); @@ -132,6 +132,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { } } break; + case TDMT_VND_TASK_DEPLOY: { + if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { + } + } break; case TDMT_VND_CREATE_SMA: { // timeRangeSMA SSmaCfg vCreateSmaReq = {0}; if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) { diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index ae4e9b0a75c61f54ca923b0deaf734e3d01809da..86958da406ebb32eb57fe86d1ad76ef84a889168 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -301,8 +301,14 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { break; } + SDiskCfg pDisks = {.level = 0, .primary = 1}; + strncpy(pDisks.dir, "/var/lib/taos", TSDB_FILENAME_LEN); + int32_t numOfDisks = 1; + tsdb.pTfs = tfsOpen(&pDisks, numOfDisks); + ASSERT_NE(tsdb.pTfs, nullptr); + char *msg = (char *)calloc(1, 100); - assert(msg != NULL); + ASSERT_NE(msg, nullptr); ASSERT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0); // init diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 80a0b3554cf23434f4624f995d8bdf0c37a8ced1..08e0ac55d7ecbf1267d9962242c5c8acf59c2539 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -2349,6 +2349,9 @@ _return: CTG_API_LEAVE(code); } +int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet) { + +} int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) { CTG_API_ENTER(); @@ -2394,6 +2397,9 @@ _return: CTG_API_LEAVE(code); } +int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, const char *pIndexName, SIndexMeta** pIndexMeta) { + +} int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { CTG_API_ENTER(); @@ -2662,12 +2668,15 @@ _return: int32_t catalogGetQnodeList(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) { CTG_API_ENTER(); - + + int32_t code = 0; if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeList) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - //TODO + CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pRpc, pMgmtEps, &pQnodeList)); + +_return: CTG_API_LEAVE(TSDB_CODE_SUCCESS); } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 5785089a20beb96df31dcb7eb6a003d607f68d8e..ffb2ae3a26e3e356185e41ffc606d2672b6af320 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -163,15 +163,15 @@ typedef struct SyncClientRequest { } SyncClientRequest; SyncClientRequest* syncClientRequestBuild(uint32_t dataLen); -SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak); +SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak); // step 1 void syncClientRequestDestroy(SyncClientRequest* pMsg); void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen); void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg); char* syncClientRequestSerialize2(const SyncClientRequest* pMsg, uint32_t* len); SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len); -void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); +void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); // step 2 void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg); -SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg); +SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg); // step 3 cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg); char* syncClientRequest2Str(const SyncClientRequest* pMsg); diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 6ba27b0d8a3812e97d151bbcf10a47a0ce19ed4c..037a49f45c5e7a43f77d2dfff13edf7be56398fd 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -40,12 +40,13 @@ typedef struct SSyncRaftEntry { } SSyncRaftEntry; SSyncRaftEntry* syncEntryBuild(uint32_t dataLen); -SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); +SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4 void syncEntryDestory(SSyncRaftEntry* pEntry); -char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); -SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); +char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5 +SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6 cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry); char* syncEntry2Str(const SSyncRaftEntry* pEntry); +void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg); // step 7 // for debug ---------------------- void syncEntryPrint(const SSyncRaftEntry* pObj); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 888d7e16d1152c2a7d9de3718277041e46774eaa..7013d281e3f04a8614663c97ea8990359bac8482 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -101,7 +101,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { assert(pMsg->dataLen >= 0); SyncTerm localPreLogTerm = 0; - if (pMsg->prevLogTerm >= SYNC_INDEX_BEGIN && pMsg->prevLogTerm <= ths->pLogStore->getLastIndex(ths->pLogStore)) { + if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex); assert(pEntry != NULL); localPreLogTerm = pEntry->term; @@ -174,7 +174,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { pReply->destId = pMsg->srcId; pReply->term = ths->pRaftStore->currentTerm; pReply->success = true; - pReply->matchIndex = pMsg->prevLogIndex + 1; + + if (pMsg->dataLen > 0) { + pReply->matchIndex = pMsg->prevLogIndex + 1; + } else { + pReply->matchIndex = pMsg->prevLogIndex; + } SRpcMsg rpcMsg; syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index f9630f7cfe37e4391a9a88568bb76ca53fc301a7..93539db9386dd061fa680a83fc1d2c955f933326 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -39,7 +39,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg); if (pMsg->term < ths->pRaftStore->currentTerm) { - sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, ths->pRaftStore->currentTerm); + sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, + ths->pRaftStore->currentTerm); return ret; } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 850468f393a6b1ba0e2b3aab6a9dd23eaeaeb4f3..bc2a39aa89cb01b682567aaade00295e63911c23 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -15,6 +15,7 @@ #include "syncIndexMgr.h" #include "syncInt.h" +#include "syncRaftLog.h" // \* Leader i advances its commitIndex. // \* This is done as a separate step from handling AppendEntries responses, @@ -42,4 +43,27 @@ void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex); syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex); + + // update commit index + + if (pSyncNode->pFsm != NULL) { + SyncIndex beginIndex = SYNC_INDEX_INVALID; + SyncIndex endIndex = SYNC_INDEX_INVALID; + for (SyncIndex i = beginIndex; i <= endIndex; ++i) { + if (i != SYNC_INDEX_INVALID) { + SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i); + assert(pEntry != NULL); + + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + if (pSyncNode->pFsm->FpCommitCb != NULL) { + pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + } + + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pEntry); + } + } + } } \ No newline at end of file diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index 4b650ec087024c20f318797399baf20197ae895d..d47a525e2584652a700b2a9d4da2721fa0571dc2 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -54,20 +54,22 @@ static void syncEnvTick(void *param, void *tmrId) { SSyncEnv *pSyncEnv = (SSyncEnv *)param; if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) { ++(pSyncEnv->envTickTimerCounter); - sTrace( - "syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 ", envTickTimerCounter:%" PRIu64 ", " - "envTickTimerMS:%d, tmrId:%p", - pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, - pSyncEnv->envTickTimerMS, tmrId); + sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 + ", envTickTimerCounter:%" PRIu64 + ", " + "envTickTimerMS:%d, tmrId:%p", + pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, + pSyncEnv->envTickTimerMS, tmrId); // do something, tick ... taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer); } else { - sTrace( - "syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 ", envTickTimerCounter:%" PRIu64 ", " - "envTickTimerMS:%d, tmrId:%p", - pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, - pSyncEnv->envTickTimerMS, tmrId); + sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 + ", envTickTimerCounter:%" PRIu64 + ", " + "envTickTimerMS:%d, tmrId:%p", + pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, + pSyncEnv->envTickTimerMS, tmrId); } } diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index c8448c32eb08d3733e9b8e82d6f57ec395d99829..c841c2ac068957bcb82613647cc3ec95a9c7ac2e 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io); static int32_t syncIOStartInternal(SSyncIO *io); static int32_t syncIOStopInternal(SSyncIO *io); -static void *syncIOConsumerFunc(void *param); +static void * syncIOConsumerFunc(void *param); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); @@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) { } static void *syncIOConsumerFunc(void *param) { - SSyncIO *io = param; + SSyncIO * io = param; STaosQall *qall; - SRpcMsg *pRpcMsg, rpcMsg; + SRpcMsg * pRpcMsg, rpcMsg; qall = taosAllocateQall(); while (1) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index bd1b4213c013ebf5cdd8ca69165c29267c8c50e4..1f3e709a27e7001e71f064b75d718dfa8f0b0bc9 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -731,14 +731,44 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg int32_t ret = 0; syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); + SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; + SyncTerm term = ths->pRaftStore->currentTerm; + SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index); + assert(pEntry != NULL); + if (ths->state == TAOS_SYNC_STATE_LEADER) { - SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); ths->pLogStore->appendEntry(ths->pLogStore, pEntry); + + // only myself, maybe commit + syncNodeMaybeAdvanceCommitIndex(ths); + + // start replicate right now! syncNodeReplicate(ths); - syncEntryDestory(pEntry); + + // pre commit + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + if (ths->pFsm != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + } + } + rpcFreeCont(rpcMsg.pCont); + } else { - // ths->pFsm->FpCommitCb(-1) + // pre commit + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + if (ths->pFsm != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -1); + } + } + rpcFreeCont(rpcMsg.pCont); } + syncEntryDestory(pEntry); return ret; } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 453ac13ce338f44448ae6f9154b20d22c66a4fec..3ac4a7a1c083485fe776a92a59c2989b111966a7 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -76,7 +76,7 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { free(s); } else { - pRoot = syncRpcUnknownMsg2Json(); + pRoot = cJSON_CreateObject(); char* s; s = syncUtilprintBin((char*)(pRpcMsg->pCont), pRpcMsg->contLen); cJSON_AddStringToObject(pRoot, "pCont", s); @@ -608,6 +608,7 @@ SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { return pMsg; } +// step 1. original SRpcMsg => SyncClientRequest, add seqNum, isWeak SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak) { SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen); pMsg->originalRpcType = pOriginalRpcMsg->msgType; @@ -652,6 +653,7 @@ SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len) return pMsg; } +// step 2. SyncClientRequest => RpcMsg, to queue void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; @@ -664,6 +666,7 @@ void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg syncClientRequestDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } +// step 3. RpcMsg => SyncClientRequest, from queue SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncClientRequest* pMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); return pMsg; diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 2589c3a5e7aed0ea70c219a8a23771c38d20dd45..e4c5882e9896d423709e2fcc4c8de6245ad908b4 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -26,6 +26,7 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) { return pEntry; } +// step 4. SyncClientRequest => SSyncRaftEntry, add term, index SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) { SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen); assert(pEntry != NULL); @@ -48,6 +49,7 @@ void syncEntryDestory(SSyncRaftEntry* pEntry) { } } +// step 5. SSyncRaftEntry => bin, to raft log char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) { char* buf = malloc(pEntry->bytes); assert(buf != NULL); @@ -58,6 +60,7 @@ char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) { return buf; } +// step 6. bin => SSyncRaftEntry, from raft log SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SSyncRaftEntry* pEntry = malloc(bytes); @@ -106,6 +109,15 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) { return serialized; } +// step 7. SSyncRaftEntry => original SRpcMsg, commit to user, delete seqNum, isWeak, term, index +void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pEntry->originalRpcType; + pRpcMsg->contLen = pEntry->dataLen; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen); +} + // for debug ---------------------- void syncEntryPrint(const SSyncRaftEntry* pObj) { char* serialized = syncEntry2Str(pObj); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index ca04db81b6f636ed3eab0726e248deaac1919b14..0bb701fc09a112234622a3f32830ba12e47445b7 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -73,7 +73,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { SyncAppendEntries* pMsg = NULL; SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex); if (pEntry != NULL) { - SyncAppendEntries* pMsg = syncAppendEntriesBuild(pEntry->bytes); + pMsg = syncAppendEntriesBuild(pEntry->bytes); + assert(pMsg != NULL); // add pEntry into msg uint32_t len; @@ -86,9 +87,11 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { } else { // maybe overflow, send empty record - SyncAppendEntries* pMsg = syncAppendEntriesBuild(0); + pMsg = syncAppendEntriesBuild(0); + assert(pMsg != NULL); } + assert(pMsg != NULL); pMsg->srcId = pSyncNode->myRaftId; pMsg->destId = *pDestId; pMsg->term = pSyncNode->pRaftStore->currentTerm; diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 11537c1e98f4513d26b36b3dc65ea51739257e7e..12603bb337ce6791599bd6755756ff3ef2e090a7 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -41,7 +41,8 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) syncRequestVoteReplyLog2("==syncNodeOnRequestVoteReplyCb==", pMsg); if (pMsg->term < ths->pRaftStore->currentTerm) { - sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, ths->pRaftStore->currentTerm); + sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, + ths->pRaftStore->currentTerm); return ret; } diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 2583deb32f6c4b3e25a426d29e6f46c821731b34..282176444812d38ef9158b69e7cebea55b136547 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -28,6 +28,8 @@ add_executable(syncRpcMsgTest "") add_executable(syncPingTimerTest2 "") add_executable(syncPingSelfTest "") add_executable(syncElectTest "") +add_executable(syncEncodeTest "") +add_executable(syncWriteTest "") target_sources(syncTest @@ -150,6 +152,14 @@ target_sources(syncElectTest PRIVATE "syncElectTest.cpp" ) +target_sources(syncEncodeTest + PRIVATE + "syncEncodeTest.cpp" +) +target_sources(syncWriteTest + PRIVATE + "syncWriteTest.cpp" +) target_include_directories(syncTest @@ -307,6 +317,16 @@ target_include_directories(syncElectTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncEncodeTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncWriteTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -429,6 +449,14 @@ target_link_libraries(syncElectTest sync gtest_main ) +target_link_libraries(syncEncodeTest + sync + gtest_main +) +target_link_libraries(syncWriteTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncElectTest.cpp b/source/libs/sync/test/syncElectTest.cpp index b8a1460f3555460c58f98b126b5672f38d80ff5a..502263cfbf736758298961211db6eda5876de2a3 100644 --- a/source/libs/sync/test/syncElectTest.cpp +++ b/source/libs/sync/test/syncElectTest.cpp @@ -116,7 +116,9 @@ int main(int argc, char** argv) { //--------------------------- while (1) { - sTrace("while 1 sleep, state: %d, %s", gSyncNode->state, syncUtilState2String(gSyncNode->state)); + sTrace("while 1 sleep, state: %d, %s, electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", + gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->electTimerLogicClock, + gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e554858072bf6a3f71c8ec1c185ead2274a0b852 --- /dev/null +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -0,0 +1,204 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncRaftEntry.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 1; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM * pFsm; +SWal * pWal; +SSyncNode *pSyncNode; + +SSyncNode *syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); + + int code = walInit(); + assert(code == 0); + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = syncInfo.vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + pWal = walOpen("./wal_test", &walCfg); + assert(pWal != NULL); + + syncInfo.pWal = pWal; + + SSyncCfg *pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +SSyncNode *syncInitTest() { return syncNodeInit(); } + +void initRaftId(SSyncNode *pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char *s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +SRpcMsg *step0() { + SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg)); + memset(pMsg, 0, sizeof(SRpcMsg)); + pMsg->msgType = 9999; + pMsg->contLen = 32; + pMsg->pCont = malloc(pMsg->contLen); + snprintf((char *)(pMsg->pCont), pMsg->contLen, "hello, world"); + return pMsg; +} + +SyncClientRequest *step1(const SRpcMsg *pMsg) { + SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true); + return pRetMsg; +} + +SRpcMsg *step2(const SyncClientRequest *pMsg) { + SRpcMsg *pRetMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg)); + syncClientRequest2RpcMsg(pMsg, pRetMsg); + return pRetMsg; +} + +SyncClientRequest *step3(const SRpcMsg *pMsg) { + SyncClientRequest *pRetMsg = syncClientRequestFromRpcMsg2(pMsg); + return pRetMsg; +} + +SSyncRaftEntry *step4(const SyncClientRequest *pMsg) { + SSyncRaftEntry *pRetMsg = syncEntryBuild2((SyncClientRequest *)pMsg, 100, 0); + return pRetMsg; +} + +char *step5(const SSyncRaftEntry *pMsg, uint32_t *len) { + char *pRetMsg = syncEntrySerialize(pMsg, len); + return pRetMsg; +} + +SSyncRaftEntry *step6(const char *pMsg, uint32_t len) { + SSyncRaftEntry *pRetMsg = syncEntryDeserialize(pMsg, len); + return pRetMsg; +} + +SRpcMsg *step7(const SSyncRaftEntry *pMsg) { + SRpcMsg *pRetMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg)); + syncEntry2OriginalRpc(pMsg, pRetMsg); + return pRetMsg; +} + +int main(int argc, char **argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + void logTest(); + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + taosRemoveDir("./wal_test"); + + // step0 + SRpcMsg *pMsg0 = step0(); + syncRpcMsgPrint2((char *)"==step0==", pMsg0); + + // step1 + SyncClientRequest *pMsg1 = step1(pMsg0); + syncClientRequestPrint2((char *)"==step1==", pMsg1); + + // step2 + SRpcMsg *pMsg2 = step2(pMsg1); + syncRpcMsgPrint2((char *)"==step2==", pMsg2); + + // step3 + SyncClientRequest *pMsg3 = step3(pMsg2); + syncClientRequestPrint2((char *)"==step3==", pMsg3); + + // step4 + SSyncRaftEntry *pMsg4 = step4(pMsg3); + syncEntryPrint2((char *)"==step4==", pMsg4); + + // log, relog + SSyncNode *pSyncNode = syncNodeInit(); + assert(pSyncNode != NULL); + SSyncRaftEntry *pEntry = pMsg4; + pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry); + SSyncRaftEntry *pEntry2 = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, pEntry->index); + syncEntryPrint2((char *)"==pEntry2==", pEntry2); + + // step5 + uint32_t len; + char * pMsg5 = step5(pMsg4, &len); + char * s = syncUtilprintBin(pMsg5, len); + printf("==step5== [%s] \n", s); + free(s); + + // step6 + SSyncRaftEntry *pMsg6 = step6(pMsg5, len); + syncEntryPrint2((char *)"==step6==", pMsg6); + + // step7 + SRpcMsg *pMsg7 = step7(pMsg6); + syncRpcMsgPrint2((char *)"==step7==", pMsg7); + + return 0; +} diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d4c2dde6d06b57322198313b460b8a714833eccd --- /dev/null +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -0,0 +1,180 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncRaftEntry.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 1; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM * pFsm; +SWal * pWal; +SSyncNode *gSyncNode; + +void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { + printf("==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); + syncRpcMsgPrint2((char *)"==CommitCb==", (SRpcMsg *)pBuf); +} + +void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { + printf("==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); + syncRpcMsgPrint2((char *)"==PreCommitCb==", (SRpcMsg *)pBuf); +} + +void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { + printf("==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); + syncRpcMsgPrint2((char *)"==RollBackCb==", (SRpcMsg *)pBuf); +} + +void initFsm() { + pFsm = (SSyncFSM *)malloc(sizeof(SSyncFSM)); + pFsm->FpCommitCb = CommitCb; + pFsm->FpPreCommitCb = PreCommitCb; + pFsm->FpRollBackCb = RollBackCb; +} + +SSyncNode *syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./write_test"); + + int code = walInit(); + assert(code == 0); + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = syncInfo.vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + pWal = walOpen("./write_test_wal", &walCfg); + assert(pWal != NULL); + + syncInfo.pWal = pWal; + + SSyncCfg *pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + SSyncNode *pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +SSyncNode *syncInitTest() { return syncNodeInit(); } + +void initRaftId(SSyncNode *pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char *s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +SRpcMsg *step0() { + SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg)); + memset(pMsg, 0, sizeof(SRpcMsg)); + pMsg->msgType = 9999; + pMsg->contLen = 32; + pMsg->pCont = malloc(pMsg->contLen); + snprintf((char *)(pMsg->pCont), pMsg->contLen, "hello, world"); + return pMsg; +} + +SyncClientRequest *step1(const SRpcMsg *pMsg) { + SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true); + return pRetMsg; +} + +int main(int argc, char **argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + void logTest(); + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + taosRemoveDir("./wal_test"); + + initFsm(); + + gSyncNode = syncInitTest(); + assert(gSyncNode != NULL); + syncNodePrint2((char *)"", gSyncNode); + + initRaftId(gSyncNode); + + // step0 + SRpcMsg *pMsg0 = step0(); + syncRpcMsgPrint2((char *)"==step0==", pMsg0); + + // step1 + SyncClientRequest *pMsg1 = step1(pMsg0); + syncClientRequestPrint2((char *)"==step1==", pMsg1); + + for (int i = 0; i < 5; ++i) { + SyncClientRequest *pSyncClientRequest = pMsg1; + SRpcMsg rpcMsg; + syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); + gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg); + + taosMsleep(1000); + } + + while (1) { + sTrace("while 1 sleep"); + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 01e9808aa6dde1ea7d1ef1d48a314c9ced84fe85..97b8912b60f9bc3e39c1be03cb14a35adfe0340c 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -202,6 +202,11 @@ void tfsDirname(const STfsFile *pFile, char *dest) { tstrncpy(dest, taosDirName(tname), TSDB_FILENAME_LEN); } +void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname) { + STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId); + snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname); +} + int32_t tfsRemoveFile(const STfsFile *pFile) { return taosRemoveFile(pFile->aname); } int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2) { diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py index c6685ec4691aa6ddcc7b12f45c96cba4432ef327..c0cfd33123380b7c197d79bf6f4a43e607c4d63a 100644 --- a/tests/pytest/crash_gen/service_manager.py +++ b/tests/pytest/crash_gen/service_manager.py @@ -378,7 +378,7 @@ class TdeSubProcess: @classmethod def _stopForSure(cls, proc: Popen, sig: int): ''' - Stop a process and all sub processes with a singal, and SIGKILL if necessary + Stop a process and all sub processes with a signal, and SIGKILL if necessary ''' def doKillTdService(proc: Popen, sig: int): Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig))