提交 e5e7e713 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/vnode_refact1

......@@ -22,6 +22,7 @@
static int running = 1;
static void msg_process(TAOS_RES* msg) {
char buf[1024];
memset(buf, 0, 1024);
printf("topic: %s\n", tmq_get_topic_name(msg));
printf("vg:%d\n", tmq_get_vgroup_id(msg));
while (1) {
......@@ -220,7 +221,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
......
......@@ -324,7 +324,7 @@ typedef struct SEpSet {
int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(void* buf, SEpSet* pEp);
void* taosDecodeSEpSet(const void* buf, SEpSet* pEp);
typedef struct {
int8_t connType;
......@@ -1291,10 +1291,14 @@ typedef struct {
int32_t tSerializeSCMCreateTopicRsp(void* buf, int32_t bufLen, const SCMCreateTopicRsp* pRsp);
int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicRsp* pRsp);
typedef struct {
int64_t consumerId;
} SMqConsumerLostMsg;
typedef struct {
int32_t topicNum;
int64_t consumerId;
char* consumerGroup;
char cgroup[TSDB_CGROUP_LEN];
SArray* topicNames; // SArray<char*>
} SCMSubscribeReq;
......@@ -1302,7 +1306,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pReq->topicNum);
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeString(buf, pReq->consumerGroup);
tlen += taosEncodeString(buf, pReq->cgroup);
for (int32_t i = 0; i < pReq->topicNum; i++) {
tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i));
......@@ -1313,7 +1317,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
buf = taosDecodeFixedI32(buf, &pReq->topicNum);
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeString(buf, &pReq->consumerGroup);
buf = taosDecodeStringTo(buf, pReq->cgroup);
pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*));
for (int32_t i = 0; i < pReq->topicNum; i++) {
char* name;
......@@ -1389,10 +1393,10 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq
}
typedef struct {
const char* key;
SArray* lostConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t>
SArray* newConsumers; // SArray<int64_t>
char key[TSDB_SUBSCRIBE_KEY_LEN];
SArray* lostConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t>
SArray* newConsumers; // SArray<int64_t>
} SMqRebSubscribe;
static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
......@@ -1400,7 +1404,7 @@ static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
if (pRebSub == NULL) {
goto _err;
}
pRebSub->key = strdup(key);
strcpy(pRebSub->key, key);
pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->lostConsumers == NULL) {
goto _err;
......@@ -1425,6 +1429,7 @@ _err:
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or
// deserialization
typedef struct {
int8_t* mqInReb;
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
} SMqDoRebalanceMsg;
......@@ -1936,6 +1941,40 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
return buf;
}
typedef struct {
int64_t leftForVer;
int32_t vgId;
int64_t oldConsumerId;
int64_t newConsumerId;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
char* qmsg;
} SMqRebVgReq;
static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
tlen += taosEncodeString(buf, pReq->subKey);
tlen += taosEncodeString(buf, pReq->qmsg);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
buf = taosDecodeStringTo(buf, pReq->subKey);
buf = taosDecodeString(buf, &pReq->qmsg);
return (void*)buf;
}
typedef struct {
int8_t reserved;
} SMqRebVgRsp;
typedef struct {
int64_t leftForVer;
int32_t vgId;
......@@ -2406,6 +2445,7 @@ typedef struct {
int64_t consumerId;
} SMqRspHead;
#if 0
typedef struct {
SMsgHead head;
......@@ -2419,6 +2459,17 @@ typedef struct {
uint64_t reqId;
char topic[TSDB_TOPIC_FNAME_LEN];
} SMqPollReq;
#endif
typedef struct {
SMsgHead head;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int32_t epoch;
uint64_t reqId;
int64_t consumerId;
int64_t blockingTime;
int64_t currentOffset;
} SMqPollReqV2;
typedef struct {
int32_t vgId;
......@@ -2492,13 +2543,81 @@ static FORCE_INLINE void* tDecodeSMqPollRspV2(const void* buf, SMqPollRspV2* pRs
return (void*)buf;
}
typedef struct {
SMqRspHead head;
int64_t reqOffset;
int64_t rspOffset;
int32_t skipLogNum;
int32_t blockNum;
int8_t withTbName;
int8_t withSchema;
int8_t withTag;
int8_t withTagSchema;
SArray* blockDataLen; // SArray<int32_t>
SArray* blockData; // SArray<SRetrieveTableRsp*>
SArray* blockTbName; // SArray<char*>
SArray* blockSchema; // SArray<SSchemaWrapper>
SArray* blockTags; // SArray<kvrow>
SArray* blockTagSchema; // SArray<kvrow>
} SMqDataBlkRsp;
static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
tlen += taosEncodeFixedI32(buf, pRsp->blockNum);
if (pRsp->blockNum != 0) {
tlen += taosEncodeFixedI8(buf, pRsp->withTbName);
tlen += taosEncodeFixedI8(buf, pRsp->withSchema);
tlen += taosEncodeFixedI8(buf, pRsp->withTag);
tlen += taosEncodeFixedI8(buf, pRsp->withTagSchema);
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i);
void* data = taosArrayGetP(pRsp->blockData, i);
tlen += taosEncodeFixedI32(buf, bLen);
tlen += taosEncodeBinary(buf, data, bLen);
}
}
return tlen;
}
static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
if (pRsp->blockNum != 0) {
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
buf = taosDecodeFixedI8(buf, &pRsp->withTag);
buf = taosDecodeFixedI8(buf, &pRsp->withTagSchema);
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = 0;
void* data = NULL;
buf = taosDecodeFixedI32(buf, &bLen);
buf = taosDecodeBinary(buf, &data, bLen);
taosArrayPush(pRsp->blockDataLen, &bLen);
taosArrayPush(pRsp->blockData, &data);
}
}
return (void*)buf;
}
typedef struct {
SMqRspHead head;
char cgroup[TSDB_CGROUP_LEN];
SArray* topics; // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp;
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
// taosMemoryFree(pSubTopicEp->schema.pSchema);
taosArrayDestroy(pSubTopicEp->vgs);
}
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0;
......
......@@ -146,6 +146,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp)
......@@ -177,6 +178,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", SMqSetCVgReq, SMqSetCVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_REB, "vnode-mq-mv-rebalance", SMqMVRebReq, SMqMVRebRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CANCEL_CONN, "vnode-mq-mv-cancel-conn", SMqCancelConnReq, SMqCancelConnRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)
......
......@@ -92,7 +92,13 @@ extern "C" {
typedef struct SMnode SMnode;
typedef struct SSdbRaw SSdbRaw;
typedef struct SSdbRow SSdbRow;
typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType;
typedef enum {
SDB_KEY_BINARY = 1,
SDB_KEY_INT32 = 2,
SDB_KEY_INT64 = 3,
} EKeyType;
typedef enum {
SDB_STATUS_INIT = 0,
SDB_STATUS_CREATING = 1,
......
......@@ -27,6 +27,11 @@ extern "C" {
typedef int32_t (*__compar_fn_t)(const void *, const void *);
#endif
typedef void *(*FCopy)(void *);
typedef void (*FDelete)(void *);
typedef int32_t (*FEncode)(void **buf, const void *dst);
typedef void *(*FDecode)(const void *buf, void *dst);
#define TD_EQ 0x1
#define TD_GT 0x2
#define TD_LT 0x4
......
......@@ -279,6 +279,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E8)
#define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9)
#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03EA)
#define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EB)
// mnode-stream
#define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0)
......
......@@ -41,10 +41,10 @@ extern "C" {
#define TARRAY_GET_START(array) ((array)->pData)
typedef struct SArray {
size_t size;
size_t size;
uint32_t capacity;
uint32_t elemSize;
void* pData;
void* pData;
} SArray;
/**
......@@ -199,6 +199,13 @@ SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize);
*/
SArray* taosArrayDup(const SArray* pSrc);
/**
* deep copy a new array
* @param pSrc
*/
SArray* taosArrayDeepCopy(const SArray* pSrc, FCopy deepCopy);
/**
* clear the array (remove all element)
* @param pArray
......@@ -212,19 +219,9 @@ void taosArrayClear(SArray* pArray);
*/
void taosArrayClearEx(SArray* pArray, void (*fp)(void*));
/**
* destroy array list
* @param pArray
*/
void* taosArrayDestroy(SArray* pArray);
/**
*
* @param pArray
* @param fp
*/
void taosArrayDestroyEx(SArray* pArray, void (*fp)(void*));
void taosArrayDestroyP(SArray* pArray, FDelete fp);
void taosArrayDestroyEx(SArray* pArray, FDelete fp);
/**
* sort the array
......@@ -272,6 +269,9 @@ char* taosArraySearchString(const SArray* pArray, const char* key, __compar_fn_t
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param);
int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode);
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz);
#ifdef __cplusplus
}
#endif
......
......@@ -60,6 +60,7 @@ extern int32_t tsdbDebugFlag;
extern int32_t tqDebugFlag;
extern int32_t fsDebugFlag;
extern int32_t metaDebugFlag;
extern int32_t fnDebugFlag;
int32_t taosInitLog(const char *logName, int32_t maxFiles);
void taosCloseLog();
......
......@@ -44,7 +44,7 @@ extern "C" {
} while (0)
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
#define HEARTBEAT_INTERVAL 1500 // ms
#define HEARTBEAT_INTERVAL 1500 // ms
enum {
RES_TYPE__QUERY = 1,
......@@ -187,11 +187,13 @@ typedef struct SRequestSendRecvBody {
} SRequestSendRecvBody;
typedef struct {
int8_t resType;
char* topic;
SArray* res; // SArray<SReqResultInfo>
int32_t resIter;
int32_t vgId;
int8_t resType;
char topic[TSDB_TOPIC_FNAME_LEN];
int32_t vgId;
SSchemaWrapper schema;
int32_t resIter;
SMqDataBlkRsp rsp;
SReqResultInfo resInfo;
} SMqRspObj;
typedef struct SRequestObj {
......@@ -211,16 +213,24 @@ typedef struct SRequestObj {
SRequestSendRecvBody body;
} SRequestObj;
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
SMqRspObj* msg = (SMqRspObj*)res;
int32_t resIter = msg->resIter == -1 ? 0 : msg->resIter;
return (SReqResultInfo*)taosArrayGet(msg->res, resIter);
return (SReqResultInfo*)&msg->resInfo;
}
static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res) {
static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
SMqRspObj* msg = (SMqRspObj*)res;
if (++msg->resIter < taosArrayGetSize(msg->res)) {
return (SReqResultInfo*)taosArrayGet(msg->res, msg->resIter);
msg->resIter++;
if (msg->resIter < msg->rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(msg->rsp.blockData, msg->resIter);
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4);
return &msg->resInfo;
}
return NULL;
}
......@@ -238,25 +248,25 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
int taos_init();
int taos_init();
void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo);
void destroyTscObj(void* pObj);
STscObj *acquireTscObj(int64_t rid);
int32_t releaseTscObj(int64_t rid);
void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo);
void destroyTscObj(void* pObj);
STscObj* acquireTscObj(int64_t rid);
int32_t releaseTscObj(int64_t rid);
uint64_t generateRequestId();
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(SRequestObj* pRequest);
SRequestObj *acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid);
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(SRequestObj* pRequest);
SRequestObj* acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid);
char* getDbOfConnection(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db);
void resetConnectDB(STscObj* pTscObj);
int taos_options_imp(TSDB_OPTION option, const char* str);
int taos_options_imp(TSDB_OPTION option, const char* str);
void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
......@@ -273,12 +283,6 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
// --- heartbeat
// global, called by mgmt
int hbMgrInit();
......@@ -290,7 +294,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
void appHbMgrCleanup(void);
// conn level
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
......
......@@ -14,12 +14,12 @@
*/
#include "catalog.h"
#include "scheduler.h"
#include "clientInt.h"
#include "clientStmt.h"
#include "clientLog.h"
#include "clientStmt.h"
#include "os.h"
#include "query.h"
#include "scheduler.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tref.h"
......@@ -177,25 +177,24 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return doFetchRows(pRequest, true, true);
} else if (TD_RES_TMQ(res)) {
SMqRspObj *msg = ((SMqRspObj *)res);
if (msg->resIter == -1) msg->resIter++;
SReqResultInfo *pResultInfo = taosArrayGet(msg->res, msg->resIter);
SMqRspObj *msg = ((SMqRspObj *)res);
SReqResultInfo *pResultInfo;
if (msg->resIter == -1) {
pResultInfo = tmqGetNextResInfo(res, true);
} else {
pResultInfo = tmqGetCurResInfo(res);
}
if (pResultInfo->current < pResultInfo->numOfRows) {
doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1;
return pResultInfo->row;
} else {
msg->resIter++;
if (msg->resIter < taosArrayGetSize(msg->res)) {
pResultInfo = taosArrayGet(msg->res, msg->resIter);
doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1;
return pResultInfo->row;
} else {
return NULL;
}
pResultInfo = tmqGetNextResInfo(res, true);
if (pResultInfo == NULL) return NULL;
doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1;
return pResultInfo->row;
}
} else {
// assert to avoid un-initialization error
ASSERT(0);
......@@ -455,7 +454,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
(*numOfRows) = pResultInfo->numOfRows;
return pRequest->code;
} else if (TD_RES_TMQ(res)) {
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res);
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, true);
if (pResultInfo == NULL) return -1;
pResultInfo->current = pResultInfo->numOfRows;
......@@ -474,7 +473,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
}
if (TD_RES_TMQ(res)) {
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res);
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, false);
if (pResultInfo == NULL) {
(*numOfRows) = 0;
return 0;
......@@ -710,10 +709,8 @@ int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
return stmtBindBatch(stmt, bind);
}
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
// TODO
return NULL;
}
......@@ -72,25 +72,25 @@ struct tmq_conf_t {
struct tmq_t {
// conf
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t autoCommit;
int8_t inWaiting;
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t autoCommit;
/*int8_t inWaiting;*/
int64_t consumerId;
int32_t epoch;
int32_t resetOffsetCfg;
int64_t status;
STscObj* pTscObj;
tmq_commit_cb* commit_cb;
int32_t nextTopicIdx;
int8_t epStatus;
int32_t epSkipCnt;
int32_t waitingRequest;
int32_t readyRequest;
SArray* clientTopics; // SArray<SMqClientTopic>
STaosQueue* mqueue; // queue of tmq_message_t
STaosQall* qall;
tsem_t rspSem;
/*int32_t nextTopicIdx;*/
int8_t epStatus;
int32_t epSkipCnt;
/*int32_t waitingRequest;*/
/*int32_t readyRequest;*/
SArray* clientTopics; // SArray<SMqClientTopic>
STaosQueue* mqueue; // queue of tmq_message_t
STaosQall* qall;
tsem_t rspSem;
// stat
int64_t pollCnt;
};
......@@ -134,7 +134,7 @@ typedef struct {
int32_t epoch;
SMqClientVg* vgHandle;
SMqClientTopic* topicHandle;
SMqPollRspV2 msg;
SMqDataBlkRsp msg;
} SMqPollRspWrapper;
typedef struct {
......@@ -145,6 +145,7 @@ typedef struct {
typedef struct {
tmq_t* tmq;
int32_t code;
int32_t sync;
tsem_t rspSem;
} SMqAskEpCbParam;
......@@ -255,7 +256,12 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) {
void tmq_list_destroy(tmq_list_t* list) {
SArray* container = &list->container;
/*taosArrayDestroy(container);*/
taosArrayDestroyEx(container, (void (*)(void*))taosMemoryFree);
int32_t sz = taosArrayGetSize(container);
for (int32_t i = 0; i < sz; i++) {
char* str = taosArrayGetP(container, i);
taosMemoryFree(str);
}
taosArrayDestroy(container);
}
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
......@@ -322,12 +328,12 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
return NULL;
}
pTmq->pTscObj = (STscObj*)conn;
pTmq->inWaiting = 0;
/*pTmq->inWaiting = 0;*/
pTmq->status = 0;
pTmq->pollCnt = 0;
pTmq->epoch = 0;
pTmq->waitingRequest = 0;
pTmq->readyRequest = 0;
/*pTmq->waitingRequest = 0;*/
/*pTmq->readyRequest = 0;*/
pTmq->epStatus = 0;
pTmq->epSkipCnt = 0;
// set conf
......@@ -367,12 +373,12 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) return NULL;
pTmq->inWaiting = 0;
/*pTmq->inWaiting = 0;*/
pTmq->status = 0;
pTmq->pollCnt = 0;
pTmq->epoch = 0;
pTmq->waitingRequest = 0;
pTmq->readyRequest = 0;
/*pTmq->waitingRequest = 0;*/
/*pTmq->readyRequest = 0;*/
pTmq->epStatus = 0;
pTmq->epSkipCnt = 0;
// set conf
......@@ -496,7 +502,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SCMSubscribeReq req;
req.topicNum = sz;
req.consumerId = tmq->consumerId;
req.consumerGroup = strdup(tmq->groupId);
strcpy(req.cgroup, tmq->groupId);
req.topicNames = taosArrayInit(sz, sizeof(void*));
for (int i = 0; i < sz; i++) {
......@@ -857,7 +863,6 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
#endif
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
/*printf("recv poll\n");*/
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
SMqClientVg* pVg = pParam->pVg;
SMqClientTopic* pTopic = pParam->pTopic;
......@@ -870,17 +875,15 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < tmqEpoch) {
/*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
/*tsem_post(&tmq->rspSem);*/
// do not write into queue since updating epoch reset
tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch,
tmqEpoch);
/*tsem_post(&tmq->rspSem);*/
return 0;
}
if (msgEpoch != tmqEpoch) {
tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
} else {
atomic_sub_fetch_32(&tmq->waitingRequest, 1);
}
#if 0
......@@ -902,45 +905,33 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
#endif
/*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/
/*tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));*/
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper));
if (pRspWrapper == NULL) {
tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
goto CREATE_MSG_FAIL;
}
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
pRspWrapper->vgHandle = pVg;
pRspWrapper->topicHandle = pTopic;
/*memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));*/
memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqPollRspV2(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
// TODO: alloc mem
/*pRsp->*/
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
#if 0
if (pRsp->msg.numOfTopics == 0) {
/*printf("no data\n");*/
taosFreeQitem(pRsp);
goto CREATE_MSG_FAIL;
}
#endif
tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
taosWriteQitem(tmq->mqueue, pRspWrapper);
atomic_add_fetch_32(&tmq->readyRequest, 1);
/*tsem_post(&tmq->rspSem);*/
return 0;
return 0;
CREATE_MSG_FAIL:
if (pParam->epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
/*tsem_post(&tmq->rspSem);*/
return code;
return -1;
}
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
......@@ -1023,6 +1014,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = pParam->tmq;
pParam->code = code;
if (code != 0) {
tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->sync);
goto END;
......@@ -1062,6 +1054,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
taosWriteQitem(tmq->mqueue, pWrapper);
/*tsem_post(&tmq->rspSem);*/
taosMemoryFree(pParam);
}
END:
......@@ -1073,7 +1066,8 @@ END:
}
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
int32_t code = 0;
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) {
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
......@@ -1130,8 +1124,12 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (sync) tsem_wait(&pParam->rspSem);
return 0;
if (sync) {
tsem_wait(&pParam->rspSem);
code = pParam->code;
taosMemoryFree(pParam);
}
return code;
}
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
......@@ -1157,7 +1155,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
return TMQ_RESP_ERR__FAIL;
}
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
int64_t reqOffset;
if (pVg->currentOffset >= 0) {
reqOffset = pVg->currentOffset;
......@@ -1169,13 +1167,18 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo
reqOffset = tmq->resetOffsetCfg;
}
SMqPollReq* pReq = taosMemoryMalloc(sizeof(SMqPollReq));
SMqPollReqV2* pReq = taosMemoryMalloc(sizeof(SMqPollReqV2));
if (pReq == NULL) {
return NULL;
}
strcpy(pReq->topic, pTopic->topicName);
strcpy(pReq->cgroup, tmq->groupId);
/*strcpy(pReq->topic, pTopic->topicName);*/
/*strcpy(pReq->cgroup, tmq->groupId);*/
int32_t tlen = strlen(tmq->groupId);
memcpy(pReq->subKey, tmq->groupId, tlen);
pReq->subKey[tlen] = TMQ_SEPARATOR;
strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
pReq->blockingTime = blockingTime;
pReq->consumerId = tmq->consumerId;
......@@ -1184,101 +1187,26 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo
pReq->reqId = generateRequestId();
pReq->head.vgId = htonl(pVg->vgId);
pReq->head.contLen = htonl(sizeof(SMqPollReq));
pReq->head.contLen = htonl(sizeof(SMqPollReqV2));
return pReq;
}
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
pRspObj->topic = strdup(pWrapper->topicHandle->topicName);
pRspObj->resIter = -1;
strncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
SMqPollRspV2* pRsp = &pWrapper->msg;
int32_t blockNum = taosArrayGetSize(pRsp->blockPos);
pRspObj->res = taosArrayInit(blockNum, sizeof(SReqResultInfo));
for (int32_t i = 0; i < blockNum; i++) {
int32_t pos = *(int32_t*)taosArrayGet(pRsp->blockPos, i);
SRetrieveTableRsp* pRetrieve = POINTER_SHIFT(pRsp->blockData, pos);
SReqResultInfo resInfo = {0};
resInfo.totalRows = 0;
resInfo.precision = TSDB_TIME_PRECISION_MILLI;
setResSchemaInfo(&resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
setQueryResultFromRsp(&resInfo, pRetrieve, true);
taosArrayPush(pRspObj->res, &resInfo);
}
return pRspObj;
}
#if 0
tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
tmq_message_t* msg = NULL;
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
/*if (vgStatus != TMQ_VG_STATUS__IDLE) {*/
/*continue;*/
/*}*/
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem
return NULL;
}
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (pParam == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem
return NULL;
}
pParam->tmq = tmq;
pParam->pVg = pVg;
pParam->epoch = tmq->epoch;
pParam->sync = 1;
pParam->msg = &msg;
tsem_init(&pParam->rspSem, 0, 0);
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
return NULL;
}
sendInfo->msgInfo = (SDataBuf){
.pData = pReq,
.len = sizeof(SMqPollReq),
.handle = NULL,
};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqPollCb;
sendInfo->msgType = TDMT_VND_CONSUME;
pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
int64_t transporterId = 0;
/*printf("send poll\n");*/
atomic_add_fetch_32(&tmq->waitingRequest, 1);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++;
tmq->pollCnt++;
/*SRetrieveTableRsp* pRetrieve = taosArrayGetP(pWrapper->msg.blockData, 0);*/
pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
tsem_wait(&pParam->rspSem);
tmq_message_t* nmsg = NULL;
while (1) {
taosReadQitem(tmq->mqueue, (void**)&nmsg);
if (nmsg == NULL) continue;
while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) {
taosReadQitem(tmq->mqueue, (void**)&nmsg);
}
return nmsg;
}
}
}
return NULL;
taosFreeQitem(pWrapper);
return pRspObj;
}
#endif
int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
/*printf("call poll\n");*/
......@@ -1301,7 +1229,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
#endif
}
atomic_store_32(&pVg->vgSkipCnt, 0);
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
SMqPollReqV2* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
/*tsem_post(&tmq->rspSem);*/
......@@ -1332,7 +1260,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
sendInfo->msgInfo = (SDataBuf){
.pData = pReq,
.len = sizeof(SMqPollReq),
.len = sizeof(SMqPollReqV2),
.handle = NULL,
};
sendInfo->requestId = pReq->reqId;
......@@ -1343,7 +1271,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t transporterId = 0;
/*printf("send poll\n");*/
atomic_add_fetch_32(&tmq->waitingRequest, 1);
/*atomic_add_fetch_32(&tmq->waitingRequest, 1);*/
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
......@@ -1385,7 +1313,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
atomic_sub_fetch_32(&tmq->readyRequest, 1);
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
/*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
/*printf("epoch match\n");*/
......@@ -1393,7 +1321,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->msg.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->msg.dataLen == 0) {
if (pollRspWrapper->msg.blockNum == 0) {
taosFreeQitem(pollRspWrapper);
rspWrapper = NULL;
continue;
......@@ -1449,7 +1377,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
// TODO: put into another thread or delayed queue
int64_t status = atomic_load_64(&tmq->status);
tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);
while (0 != tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT)) {
tscDebug("not ready, retry\n");
taosSsleep(1);
}
rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
if (rspObj) {
......
......@@ -289,6 +289,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "fnDebugFlag", fnDebugFlag, 0, 255, 0) != 0) return -1;
return 0;
}
......@@ -473,6 +474,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
tsdbDebugFlag = cfgGetItem(pCfg, "tsdbDebugFlag")->i32;
tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32;
fsDebugFlag = cfgGetItem(pCfg, "fsDebugFlag")->i32;
fnDebugFlag = cfgGetItem(pCfg, "fnDebugFlag")->i32;
}
static int32_t taosSetClientCfg(SConfig *pCfg) {
......
......@@ -125,14 +125,14 @@ int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) {
return tlen;
}
void *taosDecodeSEpSet(void *buf, SEpSet *pEp) {
void *taosDecodeSEpSet(const void *buf, SEpSet *pEp) {
buf = taosDecodeFixedI8(buf, &pEp->inUse);
buf = taosDecodeFixedI8(buf, &pEp->numOfEps);
for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) {
buf = taosDecodeFixedU16(buf, &pEp->eps[i].port);
buf = taosDecodeStringTo(buf, pEp->eps[i].fqdn);
}
return buf;
return (void *)buf;
}
static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq) {
......@@ -3643,8 +3643,6 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
if (tEncodeI32(&encoder, astLen) < 0) return -1;
if (tEncodeI8(&encoder, pReq->triggerType) < 0) return -1;
if (tEncodeI64(&encoder, pReq->watermark) < 0) return -1;
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
......
......@@ -211,6 +211,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE);
......
......@@ -250,7 +250,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg, DEFAULT_HANDLE);
//dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg, DEFAULT_HANDLE);
......@@ -263,10 +263,11 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg, DEFAULT_HANDLE);
//dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg, DEFAULT_HANDLE);
//dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg, DEFAULT_HANDLE);
//dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, vmProcessWriteMsg, DEFAULT_HANDLE);
//dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, DEFAULT_HANDLE);
......
......@@ -22,27 +22,30 @@
extern "C" {
#endif
enum {
MQ_CONSUMER_STATUS__INIT = 1,
MQ_CONSUMER_STATUS__IDLE,
MQ_CONSUMER_STATUS__ACTIVE,
// MQ_CONSUMER_STATUS__INIT = 1,
MQ_CONSUMER_STATUS__MODIFY = 1,
MQ_CONSUMER_STATUS__MODIFY_IN_REB,
// MQ_CONSUMER_STATUS__IDLE,
MQ_CONSUMER_STATUS__READY,
MQ_CONSUMER_STATUS__LOST,
MQ_CONSUMER_STATUS__MODIFY
MQ_CONSUMER_STATUS__LOST_IN_REB,
MQ_CONSUMER_STATUS__LOST_REBD,
};
int32_t mndInitConsumer(SMnode *pMnode);
void mndCleanupConsumer(SMnode *pMnode);
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup);
SMqConsumerObj *mndCreateConsumer(int64_t consumerId, const char *cgroup);
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
#ifdef __cplusplus
}
#endif
......
......@@ -88,6 +88,7 @@ typedef enum {
TRN_TYPE_CREATE_STREAM = 1019,
TRN_TYPE_DROP_STREAM = 1020,
TRN_TYPE_ALTER_STREAM = 1021,
TRN_TYPE_CONSUMER_LOST = 1022,
TRN_TYPE_BASIC_SCOPE_END,
TRN_TYPE_GLOBAL_SCOPE = 2000,
TRN_TYPE_CREATE_DNODE = 2001,
......@@ -511,6 +512,7 @@ static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset)
return buf;
}
#if 0
typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN];
int32_t status;
......@@ -641,6 +643,7 @@ static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
pSub->unassignedVg = NULL;
}
}
#endif
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
......@@ -659,6 +662,7 @@ typedef struct {
SSchemaWrapper schema;
} SMqTopicObj;
#if 0
typedef struct {
int64_t consumerId;
int64_t connId;
......@@ -713,7 +717,7 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->currentTopics = taosArrayInit(sz, sizeof(SMqConsumerObj));
pConsumer->currentTopics = taosArrayInit(sz, sizeof(void*));
for (int32_t i = 0; i < sz; i++) {
char* topic;
buf = taosDecodeString(buf, &topic);
......@@ -721,7 +725,7 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
}
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->recentRemovedTopics = taosArrayInit(sz, sizeof(SMqConsumerObj));
pConsumer->recentRemovedTopics = taosArrayInit(sz, sizeof(void*));
for (int32_t i = 0; i < sz; i++) {
char* topic;
buf = taosDecodeString(buf, &topic);
......@@ -729,6 +733,132 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
}
return buf;
}
#endif
enum {
CONSUMER_UPDATE__TOUCH = 1,
CONSUMER_UPDATE__ADD,
CONSUMER_UPDATE__REMOVE,
CONSUMER_UPDATE__LOST,
CONSUMER_UPDATE__MODIFY,
};
typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
int8_t updateType; // used only for update
int32_t epoch;
int32_t status;
// hbStatus is not applicable to serialization
int32_t hbStatus;
// lock is used for topics update
SRWLatch lock;
SArray* currentTopics; // SArray<char*>
#if 0
SArray* waitingRebTopics; // SArray<char*>
#endif
SArray* rebNewTopics; // SArray<char*>
SArray* rebRemovedTopics; // SArray<char*>
} SMqConsumerObj;
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer);
typedef struct {
int32_t vgId;
char* qmsg;
// char topic[TSDB_TOPIC_FNAME_LEN];
SEpSet epSet;
} SMqVgEp;
SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp);
void tDeleteSMqVgEp(SMqVgEp* pVgEp);
int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp);
void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp);
typedef struct {
int64_t consumerId; // -1 for unassigned
SArray* vgs; // SArray<SMqVgEp*>
} SMqConsumerEpInSub;
SMqConsumerEpInSub* tCloneSMqConsumerEpInSub(const SMqConsumerEpInSub* pEpInSub);
void tDeleteSMqConsumerEpInSub(SMqConsumerEpInSub* pEpInSub);
int32_t tEncodeSMqConsumerEpInSub(void** buf, const SMqConsumerEpInSub* pEpInSub);
void* tDecodeSMqConsumerEpInSub(const void* buf, SMqConsumerEpInSub* pEpInSub);
typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN];
SRWLatch lock;
int32_t vgNum;
SHashObj* consumerHash; // consumerId -> SMqConsumerEpInSub
} SMqSubscribeObj;
SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]);
SMqSubscribeObj* tCloneSubscribeObj(const SMqSubscribeObj* pSub);
void tDeleteSubscribeObj(SMqSubscribeObj* pSub);
int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub);
void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub);
typedef struct {
int32_t epoch;
SArray* consumers; // SArray<SMqConsumerEpInSub*>
} SMqSubActionLogEntry;
SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
int32_t tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry);
void* tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry);
typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN];
SArray* logs; // SArray<SMqSubActionLogEntry*>
} SMqSubActionLogObj;
SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog);
void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog);
int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog);
void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog);
typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
SRWLatch lock;
SArray* vgs; // SArray<SMqVgEp*>
} SMqConsumerEpObj;
SMqConsumerEpObj* tCloneSMqConsumerEpObj(const SMqConsumerEpObj* pConsumerEp);
void tDeleteSMqConsumerEpObj(SMqConsumerEpObj* pConsumerEp);
int32_t tEncodeSMqConsumerEpObj(void** buf, const SMqConsumerEpObj* pConsumerEp);
void* tDecodeSMqConsumerEpObj(const void* buf, SMqConsumerEpObj* pConsumerEp);
typedef struct {
const SMqSubscribeObj* pOldSub;
const SMqTopicObj* pTopic;
const SMqRebSubscribe* pRebInfo;
} SMqRebInputObj;
typedef struct {
int64_t oldConsumerId;
int64_t newConsumerId;
SMqVgEp* pVgEp;
} SMqRebOutputVg;
#if 0
typedef struct {
int64_t consumerId;
} SMqRebOutputConsumer;
#endif
typedef struct {
SArray* rebVgs; // SArray<SMqRebOutputVg>
SArray* newConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t>
SArray* touchedConsumers; // SArray<int64_t>
SMqSubscribeObj* pSub;
SMqSubActionLogEntry* pLogEntry;
} SMqRebOutputObj;
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
......
......@@ -31,7 +31,7 @@ void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset);
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset);
SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw);
int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs);
int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs);
static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) {
return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
......
......@@ -26,9 +26,11 @@ int32_t mndInitSubscribe(SMnode *pMnode);
void mndCleanupSubscribe(SMnode *pMnode);
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const char *topicName);
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char* key);
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key);
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
#ifdef __cplusplus
}
#endif
......
......@@ -14,6 +14,403 @@
*/
#include "mndDef.h"
#include "mndConsumer.h"
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) {
return NULL;
}
pConsumer->consumerId = consumerId;
memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
pConsumer->epoch = 0;
pConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
pConsumer->hbStatus = 0;
taosInitRWLatch(&pConsumer->lock);
pConsumer->currentTopics = taosArrayInit(0, sizeof(void *));
#if 0
pConsumer->waitingRebTopics = NULL;
#endif
pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
if (pConsumer->currentTopics == NULL || pConsumer->rebNewTopics == NULL || pConsumer->rebRemovedTopics == NULL) {
taosArrayDestroy(pConsumer->currentTopics);
taosArrayDestroy(pConsumer->rebNewTopics);
taosArrayDestroy(pConsumer->rebRemovedTopics);
taosMemoryFree(pConsumer);
return NULL;
}
return pConsumer;
}
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
if (pConsumer->currentTopics) {
taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
}
#if 0
if (pConsumer->waitingRebTopics) {
taosArrayDestroyP(pConsumer->waitingRebTopics, taosMemoryFree);
}
#endif
if (pConsumer->rebNewTopics) {
taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
}
if (pConsumer->rebRemovedTopics) {
taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
}
}
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
int32_t tlen = 0;
int32_t sz;
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
tlen += taosEncodeString(buf, pConsumer->cgroup);
tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
tlen += taosEncodeFixedI32(buf, pConsumer->status);
// current topics
if (pConsumer->currentTopics) {
sz = taosArrayGetSize(pConsumer->currentTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
tlen += taosEncodeString(buf, topic);
}
} else {
tlen += taosEncodeFixedI32(buf, 0);
}
#if 0
// waiting reb topics
if (pConsumer->waitingRebTopics) {
sz = taosArrayGetSize(pConsumer->waitingRebTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->waitingRebTopics, i);
tlen += taosEncodeString(buf, topic);
}
} else {
tlen += taosEncodeFixedI32(buf, 0);
}
#endif
// reb new topics
if (pConsumer->rebNewTopics) {
sz = taosArrayGetSize(pConsumer->rebNewTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
tlen += taosEncodeString(buf, topic);
}
} else {
tlen += taosEncodeFixedI32(buf, 0);
}
// reb removed topics
if (pConsumer->rebRemovedTopics) {
sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
tlen += taosEncodeString(buf, topic);
}
} else {
tlen += taosEncodeFixedI32(buf, 0);
}
return tlen;
}
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
buf = taosDecodeFixedI32(buf, &pConsumer->status);
// current topics
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
char *topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->currentTopics, &topic);
}
#if 0
// waiting reb topics
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->waitingRebTopics = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
char *topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->waitingRebTopics, &topic);
}
#endif
// reb new topics
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
char *topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->rebNewTopics, &topic);
}
// reb removed topics
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
char *topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->rebRemovedTopics, &topic);
}
return (void *)buf;
}
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
if (pVgEpNew == NULL) return NULL;
pVgEpNew->vgId = pVgEp->vgId;
pVgEpNew->qmsg = strdup(pVgEp->qmsg);
/*memcpy(pVgEpNew->topic, pVgEp->topic, TSDB_TOPIC_FNAME_LEN);*/
pVgEpNew->epSet = pVgEp->epSet;
return pVgEpNew;
}
void tDeleteSMqVgEp(SMqVgEp *pVgEp) { taosMemoryFree(pVgEp->qmsg); }
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
tlen += taosEncodeString(buf, pVgEp->qmsg);
/*tlen += taosEncodeString(buf, pVgEp->topic);*/
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen;
}
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
buf = taosDecodeString(buf, &pVgEp->qmsg);
/*buf = taosDecodeStringTo(buf, pVgEp->topic);*/
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
return (void *)buf;
}
SMqConsumerEpObj *tCloneSMqConsumerEpObj(const SMqConsumerEpObj *pConsumerEp) {
SMqConsumerEpObj *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEpObj));
if (pConsumerEpNew == NULL) return NULL;
pConsumerEpNew->consumerId = pConsumerEp->consumerId;
memcpy(pConsumerEpNew->cgroup, pConsumerEp->cgroup, TSDB_CGROUP_LEN);
taosInitRWLatch(&pConsumerEpNew->lock);
pConsumerEpNew->vgs = taosArrayDeepCopy(pConsumerEpNew->vgs, (FCopy)tCloneSMqVgEp);
return pConsumerEpNew;
}
void tDeleteSMqConsumerEpObj(SMqConsumerEpObj *pConsumerEp) {
taosArrayDestroyEx(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
}
int32_t tEncodeSMqConsumerEpObj(void **buf, const SMqConsumerEpObj *pConsumerEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
tlen += taosEncodeString(buf, pConsumerEp->cgroup);
tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
return tlen;
}
void *tDecodeSMqConsumerEpObj(const void *buf, SMqConsumerEpObj *pConsumerEp) {
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = taosDecodeStringTo(buf, pConsumerEp->cgroup);
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp));
return (void *)buf;
}
SMqConsumerEpInSub *tCloneSMqConsumerEpInSub(const SMqConsumerEpInSub *pEpInSub) {
SMqConsumerEpInSub *pEpInSubNew = taosMemoryMalloc(sizeof(SMqConsumerEpInSub));
if (pEpInSubNew == NULL) return NULL;
pEpInSubNew->consumerId = pEpInSub->consumerId;
pEpInSubNew->vgs = taosArrayDeepCopy(pEpInSub->vgs, (FCopy)tCloneSMqVgEp);
return pEpInSubNew;
}
void tDeleteSMqConsumerEpInSub(SMqConsumerEpInSub *pEpInSub) {
taosArrayDestroyEx(pEpInSub->vgs, (FDelete)tDeleteSMqVgEp);
}
int32_t tEncodeSMqConsumerEpInSub(void **buf, const SMqConsumerEpInSub *pEpInSub) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pEpInSub->consumerId);
int32_t sz = taosArrayGetSize(pEpInSub->vgs);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pEpInSub->vgs, i);
tlen += tEncodeSMqVgEp(buf, pVgEp);
}
/*tlen += taosEncodeArray(buf, pEpInSub->vgs, (FEncode)tEncodeSMqVgEp);*/
return tlen;
}
void *tDecodeSMqConsumerEpInSub(const void *buf, SMqConsumerEpInSub *pEpInSub) {
buf = taosDecodeFixedI64(buf, &pEpInSub->consumerId);
/*buf = taosDecodeArray(buf, &pEpInSub->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp));*/
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pEpInSub->vgs = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
buf = tDecodeSMqVgEp(buf, pVgEp);
taosArrayPush(pEpInSub->vgs, &pVgEp);
}
return (void *)buf;
}
SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) {
SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
if (pSubNew == NULL) return NULL;
memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN);
taosInitRWLatch(&pSubNew->lock);
pSubNew->vgNum = -1;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
// TODO set free fp
SMqConsumerEpInSub epInSub = {
.consumerId = -1,
.vgs = taosArrayInit(0, sizeof(void *)),
};
int64_t unexistKey = -1;
taosHashPut(pSubNew->consumerHash, &unexistKey, sizeof(int64_t), &epInSub, sizeof(SMqConsumerEpInSub));
return pSubNew;
}
SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
if (pSubNew == NULL) return NULL;
memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
taosInitRWLatch(&pSubNew->lock);
pSubNew->vgNum = pSub->vgNum;
/*pSubNew->consumerEps = taosArrayDeepCopy(pSub->consumerEps, (FCopy)tCloneSMqConsumerEpInSub);*/
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
/*taosHashSetFreeFp(pSubNew->consumerHash, taosArrayDestroy);*/
void *pIter = NULL;
SMqConsumerEpInSub *pEpInSub = NULL;
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
pEpInSub = (SMqConsumerEpInSub *)pIter;
SMqConsumerEpInSub newEp = {
.consumerId = pEpInSub->consumerId,
.vgs = taosArrayDeepCopy(pEpInSub->vgs, (FCopy)tCloneSMqVgEp),
};
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEpInSub));
}
return pSubNew;
}
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
/*taosArrayDestroyEx(pSub->consumerEps, (FDelete)tDeleteSMqConsumerEpInSub);*/
taosHashCleanup(pSub->consumerHash);
}
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pSub->key);
tlen += taosEncodeFixedI32(buf, pSub->vgNum);
void *pIter = NULL;
int32_t sz = taosHashGetSize(pSub->consumerHash);
tlen += taosEncodeFixedI32(buf, sz);
int32_t cnt = 0;
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
tlen += tEncodeSMqConsumerEpInSub(buf, pEpInSub);
cnt++;
}
ASSERT(cnt == sz);
/*tlen += taosEncodeArray(buf, pSub->consumerEps, (FEncode)tEncodeSMqConsumerEpInSub);*/
return tlen;
}
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
//
buf = taosDecodeStringTo(buf, pSub->key);
buf = taosDecodeFixedI32(buf, &pSub->vgNum);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
for (int32_t i = 0; i < sz; i++) {
/*SMqConsumerEpInSub* pEpInSub = taosMemoryMalloc(sizeof(SMqConsumerEpInSub));*/
SMqConsumerEpInSub epInSub = {0};
buf = tDecodeSMqConsumerEpInSub(buf, &epInSub);
taosHashPut(pSub->consumerHash, &epInSub.consumerId, sizeof(int64_t), &epInSub, sizeof(SMqConsumerEpInSub));
}
/*buf = taosDecodeArray(buf, &pSub->consumerEps, (FDecode)tDecodeSMqConsumerEpInSub, sizeof(SMqConsumerEpInSub));*/
return (void *)buf;
}
SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
if (pEntryNew == NULL) return NULL;
pEntryNew->epoch = pEntry->epoch;
pEntryNew->consumers = taosArrayDeepCopy(pEntry->consumers, (FCopy)tCloneSMqConsumerEpInSub);
return pEntryNew;
}
void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEpInSub);
}
int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pEntry->epoch);
tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
return tlen;
}
void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
buf = taosDecodeFixedI32(buf, &pEntry->epoch);
buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
return (void *)buf;
}
SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
if (pLogNew == NULL) return pLogNew;
memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
pLogNew->logs = taosArrayDeepCopy(pLog->logs, (FCopy)tCloneSMqConsumerEpInSub);
return pLogNew;
}
void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEpInSub);
}
int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pLog->key);
tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
return tlen;
}
void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
buf = taosDecodeStringTo(buf, pLog->key);
buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
return (void *)buf;
}
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
int32_t sz = 0;
......
......@@ -130,8 +130,7 @@ OFFSET_DECODE_OVER:
return pRow;
}
int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs) {
int32_t code = 0;
int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs) {
int32_t sz = taosArrayGetSize(vgs);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp *pConsumerEp = taosArrayGet(vgs, i);
......@@ -170,13 +169,22 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) {
if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) {
return -1;
}
bool create = false;
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key);
ASSERT(pOffsetObj);
if (pOffsetObj == NULL) {
pOffsetObj = taosMemoryMalloc(sizeof(SMqOffset));
memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN);
create = true;
}
pOffsetObj->offset = pOffset->offset;
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(pOffsetObj);
sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pOffsetRaw);
mndReleaseOffset(pMnode, pOffsetObj);
if (create) {
taosMemoryFree(pOffsetObj);
} else {
mndReleaseOffset(pMnode, pOffsetObj);
}
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
......@@ -201,7 +209,7 @@ static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset) {
static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOldOffset, SMqOffsetObj *pNewOffset) {
mTrace("offset:%s, perform update action", pOldOffset->key);
pOldOffset->offset = pNewOffset->offset;
atomic_store_64(&pOldOffset->offset, pNewOffset->offset);
return 0;
}
......
......@@ -434,7 +434,9 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
return -1;
}
ASSERT(pSub->vgNum == 0);
ASSERT(pSub->vgNum == -1);
pSub->vgNum = 0;
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
if (levelNum != 1) {
......@@ -453,6 +455,12 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
}
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
int64_t unexistKey = -1;
SMqConsumerEpInSub* pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
ASSERT(pEpInSub);
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
void* pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
......@@ -466,24 +474,41 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
pVgEp->epSet = plan->execNode.epSet;
pVgEp->vgId = plan->execNode.nodeId;
#if 0
SMqConsumerEp consumerEp = {0};
consumerEp.status = 0;
consumerEp.consumerId = -1;
consumerEp.epSet = plan->execNode.epSet;
consumerEp.vgId = plan->execNode.nodeId;
mDebug("init subscribption %s, assign vg: %d", pSub->key, consumerEp.vgId);
#endif
mDebug("init subscribption %s, assign vg: %d", pSub->key, pVgEp->vgId);
int32_t msgLen;
if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) {
if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
taosArrayPush(pSub->unassignedVg, &consumerEp);
taosArrayPush(pEpInSub->vgs, &pVgEp);
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
/*taosArrayPush(pSub->unassignedVg, &consumerEp);*/
}
ASSERT(pEpInSub->vgs->size > 0);
pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
ASSERT(pEpInSub->vgs->size > 0);
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
qDestroyQueryPlan(pPlan);
return 0;
......
......@@ -22,12 +22,14 @@
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
#include "mndGrant.h"
#include "mndInfoSchema.h"
#include "mndPerfSchema.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndPerfSchema.h"
#include "mndProfile.h"
#include "mndQnode.h"
#include "mndQuery.h"
#include "mndShow.h"
#include "mndSma.h"
#include "mndSnode.h"
......@@ -40,8 +42,6 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndQuery.h"
#include "mndGrant.h"
#define MQ_TIMER_MS 3000
#define TRNAS_TIMER_MS 6000
......
......@@ -16,6 +16,13 @@
#ifndef _TD_VNODE_TQ_H_
#define _TD_VNODE_TQ_H_
#include "executor.h"
#include "os.h"
#include "thash.h"
#include "tmsg.h"
#include "ttimer.h"
#include "wal.h"
#ifdef __cplusplus
extern "C" {
#endif
......@@ -30,12 +37,6 @@ extern "C" {
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
enum {
TQ_STREAM_TOKEN__DATA = 1,
TQ_STREAM_TOKEN__WATERMARK,
TQ_STREAM_TOKEN__CHECKPOINT,
};
#define TQ_BUFFER_SIZE 4
#define TQ_BUCKET_MASK 0xFF
......@@ -151,22 +152,27 @@ typedef struct {
} STqMetaStore;
typedef struct {
SMemAllocatorFactory* pAllocatorFactory;
SMemAllocator* pAllocator;
} STqMemRef;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int64_t consumerId;
int32_t epoch;
char* qmsg;
// SRWLatch lock;
SWalReadHandle* pReadHandle;
// number should be identical to fetch thread num
qTaskInfo_t task[4];
} STqExec;
struct STQ {
// the collection of groups
// the handle of meta kvstore
bool writeTrigger;
char* path;
STqMemRef tqMemRef;
STqMetaStore* tqMeta;
// STqPushMgr* tqPushMgr;
SHashObj* pStreamTasks;
SVnode* pVnode;
SWal* pWal;
SMeta* pVnodeMeta;
SHashObj* tqMetaNew; // subKey -> tqExec
SHashObj* pStreamTasks;
SVnode* pVnode;
SWal* pWal;
SMeta* pVnodeMeta;
};
typedef struct {
......@@ -230,10 +236,6 @@ typedef struct {
// TODO sync function
} STqStreamPusher;
typedef struct {
int8_t type; // mq or stream
} STqPusher;
typedef struct {
SHashObj* pHash; // <id, STqPush*>
} STqPushMgr;
......@@ -257,9 +259,10 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessCancelConnReq(STQ* pTq, char* msg);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
// int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
// int32_t tqProcessRebReq(STQ* pTq, char* msg);
// int32_t tqProcessCancelConnReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
......@@ -314,4 +317,4 @@ STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet
}
#endif
#endif /*_TD_VNODE_TQ_H_*/
\ No newline at end of file
#endif /*_TD_VNODE_TQ_H_*/
......@@ -33,20 +33,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta) {
(FTqDelete)taosMemoryFree, 0);
if (pTq->tqMeta == NULL) {
taosMemoryFree(pTq);
#if 0
allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
#endif
return NULL;
}
#if 0
pTq->tqPushMgr = tqPushMgrOpen();
if (pTq->tqPushMgr == NULL) {
// free store
taosMemoryFree(pTq);
return NULL;
}
#endif
pTq->tqMetaNew = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
......@@ -241,16 +231,15 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
return 0;
}
#if 0
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t fetchOffset;
int64_t blockingTime = pReq->blockingTime;
int32_t reqEpoch = pReq->epoch;
SMqPollReqV2* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int32_t reqEpoch = pReq->epoch;
int64_t fetchOffset;
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = 0;
fetchOffset = walGetFirstVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetLastVer(pTq->pWal);
} else {
......@@ -260,65 +249,29 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
SMqPollRsp rsp = {
/*.consumerId = consumerId,*/
.numOfTopics = 0,
.pBlockData = NULL,
};
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode));
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
tmsgSendRsp(pMsg);
return 0;
}
STqExec* pExec = taosHashGet(pTq->tqMetaNew, pReq->subKey, strlen(pReq->subKey));
ASSERT(pExec);
int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch);
int32_t consumerEpoch = atomic_load_32(&pExec->epoch);
while (consumerEpoch < reqEpoch) {
consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch);
consumerEpoch = atomic_val_compare_exchange_32(&pExec->epoch, consumerEpoch, reqEpoch);
}
STqTopic* pTopic = NULL;
int32_t sz = taosArrayGetSize(pConsumer->topics);
for (int32_t i = 0; i < sz; i++) {
STqTopic* topic = taosArrayGet(pConsumer->topics, i);
// TODO race condition
ASSERT(pConsumer->consumerId == consumerId);
if (strcmp(topic->topicName, pReq->topic) == 0) {
pTopic = topic;
break;
}
}
if (pTopic == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic,
TD_VID(pTq->pVnode));
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
tmsgSendRsp(pMsg);
return 0;
}
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
TD_VID(pTq->pVnode));
SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset;
rsp.skipLogNum = 0;
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
rsp.blockData = taosArrayInit(0, sizeof(void*));
while (1) {
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
// TODO
consumerEpoch = atomic_load_32(&pConsumer->epoch);
consumerEpoch = atomic_load_32(&pExec->epoch);
if (consumerEpoch > reqEpoch) {
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
break;
}
SWalReadHead* pHead;
if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) {
if (walReadWithHandle_s(pExec->pReadHandle, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
......@@ -326,101 +279,88 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
TD_VID(pTq->pVnode), fetchOffset);
break;
}
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
qTaskInfo_t task = pTopic->buffer.output[workerId].task;
qTaskInfo_t task = pExec->task[workerId];
ASSERT(task);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(false);
}
if (pDataBlock == NULL) {
/*pos = fetchOffset % TQ_BUFFER_SIZE;*/
break;
ASSERT(0);
}
if (pDataBlock == NULL) break;
taosArrayPush(pRes, pDataBlock);
}
ASSERT(pDataBlock->info.rows != 0);
ASSERT(pDataBlock->info.numOfCols != 0);
if (taosArrayGetSize(pRes) == 0) {
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
pReq->epoch, TD_VID(pTq->pVnode), fetchOffset);
fetchOffset++;
rsp.skipLogNum++;
taosArrayDestroy(pRes);
continue;
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = ts;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pDataBlock->info.rows);
// TODO enable compress
int32_t actualLen = 0;
blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
rsp.blockNum++;
}
rsp.schema = pTopic->buffer.output[workerId].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset;
}
rsp.numOfTopics = 1;
rsp.pBlockData = pRes;
// TODO batch optimization
if (rsp.blockNum != 0) break;
rsp.skipLogNum++;
fetchOffset++;
}
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
taosMemoryFree(pHead);
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRsp(&abuf, &rsp);
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
pHead->msgType, consumerId, pReq->epoch);
tmsgSendRsp(pMsg);
taosMemoryFree(pHead);
return 0;
} else {
taosMemoryFree(pHead);
fetchOffset++;
rsp.skipLogNum++;
}
}
if (rsp.blockNum != 0)
rsp.rspOffset = fetchOffset;
else
rsp.rspOffset = fetchOffset - 1;
/*if (blockingTime != 0) {*/
/*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/
/*} else {*/
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
rsp.rspOffset = fetchOffset - 1;
((SMqRspHead*)buf)->consumerId = consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRsp(&abuf, &rsp);
rsp.pBlockData = NULL;
tEncodeSMqDataBlkRsp(&abuf, &rsp);
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
tmsgSendRsp(pMsg);
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
pReq->epoch);
/*}*/
vDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
// TODO destroy
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
return 0;
}
#endif
#if 0
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
......@@ -429,7 +369,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
int32_t reqEpoch = pReq->epoch;
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = 0;
fetchOffset = walGetFirstVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetLastVer(pTq->pWal);
} else {
......@@ -628,7 +568,57 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return 0;
}
#endif
// TODO: persist meta into tdb
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
SMqRebVgReq req;
tDecodeSMqRebVgReq(msg, &req);
// todo lock
STqExec* pExec = taosHashGet(pTq->tqMetaNew, req.subKey, strlen(req.subKey));
if (pExec == NULL) {
ASSERT(req.oldConsumerId == -1);
ASSERT(req.newConsumerId != -1);
STqExec exec = {0};
pExec = &exec;
/*taosInitRWLatch(&pExec->lock);*/
memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
pExec->consumerId = req.newConsumerId;
pExec->epoch = -1;
pExec->qmsg = req.qmsg;
req.qmsg = NULL;
pExec->pReadHandle = walOpenReadHandle(pTq->pVnode->pWal);
for (int32_t i = 0; i < 4; i++) {
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
SReadHandle handle = {
.reader = pReadHandle,
.meta = pTq->pVnodeMeta,
};
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
ASSERT(pExec->task[i]);
}
taosHashPut(pTq->tqMetaNew, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
return 0;
} else {
/*if (req.newConsumerId != -1) {*/
/*taosWLockLatch(&pExec->lock);*/
ASSERT(pExec->consumerId == req.oldConsumerId);
// TODO handle qmsg and exec modification
atomic_store_32(&pExec->epoch, -1);
atomic_store_64(&pExec->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pExec->epoch, 1);
/*taosWUnLockLatch(&pExec->lock);*/
return 0;
/*} else {*/
// TODO
/*taosHashRemove(pTq->tqMetaNew, req.subKey, strlen(req.subKey));*/
/*return 0;*/
/*}*/
}
}
#if 0
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq req = {0};
terrno = TSDB_CODE_SUCCESS;
......@@ -747,6 +737,7 @@ int32_t tqProcessCancelConnReq(STQ* pTq, char* msg) {
terrno = TSDB_CODE_SUCCESS;
return 0;
}
#endif
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
if (pTask->execType == TASK_EXEC__NONE) return 0;
......
......@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
// TODO:replace by an abstract file layer
// #include <fcntl.h>
// #include <string.h>
// #include <unistd.h>
......
......@@ -90,6 +90,13 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
vnodeProcessSubmitReq(pVnode, ptr, pRsp);
break;
/* TQ */
case TDMT_VND_MQ_VG_CHANGE:
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
// TODO: handle error
}
break;
#if 0
case TDMT_VND_MQ_SET_CONN: {
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// TODO: handle error
......@@ -103,6 +110,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
if (tqProcessCancelConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
}
} break;
#endif
case TDMT_VND_TASK_DEPLOY: {
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
......@@ -303,4 +311,4 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg
pRsp->contLen = sizeof(SSubmitRsp);
return 0;
}
\ No newline at end of file
}
//
// Created by slzhou on 22-4-20.
//
#ifndef TDENGINE_FNLOG_H
#define TDENGINE_FNLOG_H
#include "tlog.h"
#ifdef __cplusplus
extern "C" {
#endif
#define fnFatal(...) { if (fnDebugFlag & DEBUG_FATAL) { taosPrintLog("FN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define fnError(...) { if (fnDebugFlag & DEBUG_ERROR) { taosPrintLog("FN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define fnWarn(...) { if (fnDebugFlag & DEBUG_WARN) { taosPrintLog("FN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define fnInfo(...) { if (fnDebugFlag & DEBUG_INFO) { taosPrintLog("FN ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define fnDebug(...) { if (fnDebugFlag & DEBUG_DEBUG) { taosPrintLog("FN ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define fnTrace(...) { if (fnDebugFlag & DEBUG_TRACE) { taosPrintLog("FN ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_FNLOG_H
......@@ -26,6 +26,9 @@
extern "C" {
#endif
#define UDF_LISTEN_PIPE_NAME_LEN 32
#define UDF_LISTEN_PIPE_NAME_PREFIX "udf.sock."
//======================================================================================
//begin API to taosd and qworker
......@@ -35,17 +38,28 @@ enum {
UDFC_CODE_PIPE_READ_ERR = -3,
};
/*TODO: no api for dnode startudfd/stopudfd*/
/**
* start udfd dameon service
*/
int32_t startUdfd(int32_t dnodeId);
/**
* stop udfd dameon service
*/
int32_t stopUdfd(int32_t dnodeId);
/**
* start udf dameon service
* create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf
* @return error code
*/
int32_t startUdfService();
int32_t createUdfdProxy(int32_t dnodeId);
/**
* stop udf dameon service
* destroy udfd proxy
* @return error code
*/
int32_t stopUdfService();
int32_t destroyUdfdProxy(int32_t dnodeId);
typedef void *UdfHandle;
......@@ -101,7 +115,6 @@ typedef struct SUdfInterBuf {
char* buf;
} SUdfInterBuf;
//TODO: translate these calls to callUdf
// output: interBuf
int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf);
// input: block, state
......
......@@ -32,9 +32,9 @@ typedef struct SUdfInfo {
typedef void *UdfHandle;
int32_t startUdfService();
int32_t createUdfdProxy();
int32_t stopUdfService();
int32_t destroyUdfdProxy();
//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfHandle *handles);
......
......@@ -200,10 +200,10 @@ int64_t gUdfTaskSeqNum = 0;
enum {
UDFC_STATE_INITAL = 0, // initial state
UDFC_STATE_STARTNG, // starting after startUdfService
UDFC_STATE_STARTNG, // starting after createUdfdProxy
UDFC_STATE_READY, // started and begin to receive quests
UDFC_STATE_RESTARTING, // udfd abnormal exit. cleaning up and restart.
UDFC_STATE_STOPPING, // stopping after stopUdfService
UDFC_STATE_STOPPING, // stopping after destroyUdfdProxy
UDFC_STATUS_FINAL, // stopped
};
int8_t gUdfcState = UDFC_STATE_INITAL;
......@@ -929,7 +929,7 @@ void udfStopAsyncCb(uv_async_t *async) {
}
}
int32_t startUdfd();
int32_t udfcSpawnUdfd();
void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
//TODO: pipe close will be first received
......@@ -944,12 +944,12 @@ void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
if (gUdfcState == UDFC_STATE_READY) {
gUdfcState = UDFC_STATE_RESTARTING;
//TODO: asynchronous without blocking. how to do it
cleanUpUvTasks();
startUdfd();
//cleanUpUvTasks();
udfcSpawnUdfd();
}
}
int32_t startUdfd() {
int32_t udfcSpawnUdfd() {
//TODO: path
uv_process_options_t options = {0};
static char path[256] = {0};
......@@ -979,9 +979,6 @@ int32_t startUdfd() {
void constructUdfService(void *argsThread) {
uv_loop_init(&gUdfdLoop);
//TODO spawn error
startUdfd();
uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb);
uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb);
uv_mutex_init(&gUdfTaskQueueMutex);
......@@ -994,7 +991,7 @@ void constructUdfService(void *argsThread) {
}
int32_t startUdfService() {
int32_t createUdfdProxy(int32_t dnodeId) {
gUdfcState = UDFC_STATE_STARTNG;
uv_barrier_init(&gUdfInitBarrier, 2);
uv_thread_create(&gUdfLoopThread, constructUdfService, 0);
......@@ -1002,12 +999,12 @@ int32_t startUdfService() {
return 0;
}
int32_t stopUdfService() {
int32_t destroyUdfdProxy(int32_t dnodeId) {
gUdfcState = UDFC_STATE_STOPPING;
uv_barrier_destroy(&gUdfInitBarrier);
if (gUdfcState == UDFC_STATE_STOPPING) {
uv_process_kill(&gUdfdProcess, SIGINT);
}
// if (gUdfcState == UDFC_STATE_STOPPING) {
// uv_process_kill(&gUdfdProcess, SIGINT);
// }
uv_async_send(&gUdfLoopStopAsync);
uv_thread_join(&gUdfLoopThread);
uv_mutex_destroy(&gUdfTaskQueueMutex);
......
此差异已折叠。
......@@ -8,7 +8,7 @@
#include "tdatablock.h"
int main(int argc, char *argv[]) {
startUdfService();
createUdfdProxy(1);
uv_sleep(1000);
char path[256] = {0};
size_t cwdSize = 256;
......@@ -53,5 +53,5 @@ int main(int argc, char *argv[]) {
}
teardownUdf(handle);
stopUdfService();
destroyUdfdProxy(1);
}
......@@ -38,7 +38,7 @@ extern int openU(const char *, int, ...); /* MsvcLibX UTF-8 version of open */
#include <sys/file.h>
#if !defined(_TD_DARWIN_64)
#include <sys/sendfile.h>
#include <sys/sendfile.h>
#endif
#include <sys/stat.h>
#include <unistd.h>
......@@ -58,9 +58,9 @@ typedef int32_t FileFd;
typedef struct TdFile {
TdThreadRwlock rwlock;
int refId;
FileFd fd;
FILE *fp;
int refId;
FileFd fd;
FILE *fp;
} * TdFilePtr, TdFile;
#define FILE_WITH_LOCK 1
......@@ -182,7 +182,7 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) {
return 0;
#else
struct stat fileStat;
int32_t code = stat(path, &fileStat);
int32_t code = stat(path, &fileStat);
if (code < 0) {
return code;
}
......@@ -203,7 +203,7 @@ int32_t taosDevInoFile(const char *path, int64_t *stDev, int64_t *stIno) {
return 0;
#else
struct stat fileStat;
int32_t code = stat(path, &fileStat);
int32_t code = stat(path, &fileStat);
if (code < 0) {
return code;
}
......@@ -226,7 +226,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return NULL;
#else
int fd = -1;
int fd = -1;
FILE *fp = NULL;
if (tdFileOptions & TD_FILE_STREAM) {
char *mode = NULL;
......@@ -325,7 +325,7 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
#if FILE_WITH_LOCK
taosThreadRwlockRdlock(&(pFile->rwlock));
#endif
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
int64_t leftbytes = count;
int64_t readbytes;
char *tbuf = (char *)buf;
......@@ -365,7 +365,7 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset)
#if FILE_WITH_LOCK
taosThreadRwlockRdlock(&(pFile->rwlock));
#endif
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
int64_t ret = pread(pFile->fd, buf, count, offset);
#if FILE_WITH_LOCK
taosThreadRwlockUnlock(&(pFile->rwlock));
......@@ -380,7 +380,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
#if FILE_WITH_LOCK
taosThreadRwlockWrlock(&(pFile->rwlock));
#endif
assert(pFile->fd >= 0); // Please check if you have closed the file.
/*assert(pFile->fd >= 0); // Please check if you have closed the file.*/
int64_t nleft = count;
int64_t nwritten = 0;
......@@ -414,7 +414,7 @@ int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
#if FILE_WITH_LOCK
taosThreadRwlockRdlock(&(pFile->rwlock));
#endif
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
int64_t ret = lseek(pFile->fd, (long)offset, whence);
#if FILE_WITH_LOCK
taosThreadRwlockUnlock(&(pFile->rwlock));
......@@ -429,10 +429,10 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
struct stat fileStat;
int32_t code = fstat(pFile->fd, &fileStat);
int32_t code = fstat(pFile->fd, &fileStat);
if (code < 0) {
return code;
}
......@@ -456,7 +456,7 @@ int32_t taosLockFile(TdFilePtr pFile) {
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
return (int32_t)flock(pFile->fd, LOCK_EX | LOCK_NB);
#endif
......@@ -469,7 +469,7 @@ int32_t taosUnLockFile(TdFilePtr pFile) {
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
return (int32_t)flock(pFile->fd, LOCK_UN | LOCK_NB);
#endif
......@@ -529,7 +529,7 @@ int32_t taosFtruncateFile(TdFilePtr pFile, int64_t l_size) {
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
return ftruncate(pFile->fd, l_size);
#endif
......@@ -637,7 +637,7 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co
}
off_t len = count;
while (len > 0) {
char buf[1024 * 16];
char buf[1024 * 16];
off_t n = sizeof(buf);
if (len < n) n = len;
size_t m = fread(buf, 1, n, in_file);
......@@ -662,7 +662,7 @@ int64_t taosSendFile(SocketFd dfd, FileFd sfd, int64_t *offset, int64_t count) {
}
off_t len = count;
while (len > 0) {
char buf[1024 * 16];
char buf[1024 * 16];
off_t n = sizeof(buf);
if (len < n) n = len;
size_t m = read(sfd, buf, n);
......@@ -750,7 +750,7 @@ void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length) {
if (pFile == NULL) {
return NULL;
}
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
void *ptr = mmap(NULL, length, PROT_READ, MAP_SHARED, pFile->fd, 0);
return ptr;
......@@ -811,4 +811,4 @@ bool taosCheckAccessFile(const char *pathname, int32_t tdFileAccessOptions) {
bool taosCheckExistFile(const char *pathname) { return taosCheckAccessFile(pathname, TD_FILE_ACCESS_EXIST_OK); };
#endif // WINDOWS
#endif // WINDOWS
......@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "tarray.h"
#include "tcoding.h"
SArray* taosArrayInit(size_t size, size_t elemSize) {
assert(elemSize > 0);
......@@ -75,7 +76,7 @@ int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) {
}
void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles) {
if (pArray == NULL || pData == NULL) {
if (pData == NULL) {
return NULL;
}
......@@ -317,7 +318,6 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) {
pArray->size = 0;
}
void* taosArrayDestroy(SArray* pArray) {
if (pArray) {
taosMemoryFree(pArray->pData);
......@@ -327,7 +327,14 @@ void* taosArrayDestroy(SArray* pArray) {
return NULL;
}
void taosArrayDestroyEx(SArray* pArray, void (*fp)(void*)) {
void taosArrayDestroyP(SArray* pArray, FDelete fp) {
for (int32_t i = 0; i < pArray->size; i++) {
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
}
taosArrayDestroy(pArray);
}
void taosArrayDestroyEx(SArray* pArray, FDelete fp) {
if (pArray == NULL) {
return;
}
......@@ -436,6 +443,39 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void
return;
}
SArray* taosArrayDeepCopy(const SArray* pSrc, FCopy deepCopy) {
ASSERT(pSrc->elemSize == sizeof(void*));
SArray* pArray = taosArrayInit(pSrc->size, sizeof(void*));
for (int32_t i = 0; i < pSrc->size; i++) {
void* clone = deepCopy(taosArrayGetP(pSrc, i));
taosArrayPush(pArray, &clone);
}
return pArray;
}
int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode) {
int32_t tlen = 0;
int32_t sz = pArray->size;
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
void* data = taosArrayGetP(pArray, i);
tlen += encode(buf, data);
}
return tlen;
}
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz) {
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
*pArray = taosArrayInit(sz, sizeof(void*));
for (int32_t i = 0; i < sz; i++) {
void* data = taosMemoryCalloc(1, dataSz);
buf = decode(buf, data);
taosArrayPush(*pArray, &data);
}
return (void*)buf;
}
// order array<type *>
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) {
taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
......
......@@ -92,6 +92,7 @@ int32_t tsdbDebugFlag = 131;
int32_t tqDebugFlag = 135;
int32_t fsDebugFlag = 135;
int32_t metaDebugFlag = 135;
int32_t fnDebugFlag = 135;
int64_t dbgEmptyW = 0;
int64_t dbgWN = 0;
......@@ -753,6 +754,7 @@ void taosSetAllDebugFlag(int32_t flag) {
tsdbDebugFlag = flag;
tqDebugFlag = flag;
fsDebugFlag = flag;
fnDebugFlag = flag;
uInfo("all debug flag are set to %d", flag);
}
......@@ -33,8 +33,8 @@ endi
sql connect
$dbNamme = d0
print =============== create database , vgroup 1
sql create database $dbNamme vgroups 1
print =============== create database , vgroup 4
sql create database $dbNamme vgroups 4
sql use $dbNamme
print =============== create super table
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册