提交 3f6ce2da 编写于 作者: L Liu Jicong

feat(tmq): support topic with meta

上级 c409760f
......@@ -252,6 +252,16 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
enum tmq_res_t {
TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1,
TMQ_RES_TABLE_META = 2,
};
typedef enum tmq_res_t tmq_res_t;
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, const void **raw_meta, int32_t *raw_meta_len);
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
......
......@@ -34,6 +34,7 @@ enum {
enum {
TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_RSP,
TMQ_MSG_TYPE__POLL_META_RSP,
TMQ_MSG_TYPE__EP_RSP,
TMQ_MSG_TYPE__END_RSP,
};
......
......@@ -1560,6 +1560,7 @@ typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
int8_t igExists;
int8_t subType;
int8_t withMeta;
char* sql;
char subDbName[TSDB_DB_FNAME_LEN];
union {
......@@ -2306,6 +2307,7 @@ typedef struct {
int64_t newConsumerId;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t subType;
int8_t withMeta;
char* qmsg;
int64_t suid;
} SMqRebVgReq;
......@@ -2318,6 +2320,7 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
tlen += taosEncodeString(buf, pReq->subKey);
tlen += taosEncodeFixedI8(buf, pReq->subType);
tlen += taosEncodeFixedI8(buf, pReq->withMeta);
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
tlen += taosEncodeString(buf, pReq->qmsg);
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
......@@ -2333,6 +2336,7 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
buf = taosDecodeStringTo(buf, pReq->subKey);
buf = taosDecodeFixedI8(buf, &pReq->subType);
buf = taosDecodeFixedI8(buf, &pReq->withMeta);
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
buf = taosDecodeString(buf, &pReq->qmsg);
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
......@@ -2677,6 +2681,34 @@ static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
taosArrayDestroy(pSubTopicEp->vgs);
}
typedef struct {
SMqRspHead head;
int64_t reqOffset;
int64_t rspOffset;
int16_t resMsgType;
int32_t metaRspLen;
void* metaRsp;
} SMqMetaRsp;
static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI16(buf, pRsp->resMsgType);
tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen);
tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI16(buf, &pRsp->resMsgType);
buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen);
buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen);
return (void*)buf;
}
typedef struct {
SMqRspHead head;
int64_t reqOffset;
......
......@@ -20,9 +20,9 @@
extern "C" {
#endif
#include "catalog.h"
#include "parser.h"
#include "planner.h"
#include "catalog.h"
#include "query.h"
#include "taos.h"
#include "tcommon.h"
......@@ -51,10 +51,12 @@ extern "C" {
enum {
RES_TYPE__QUERY = 1,
RES_TYPE__TMQ,
RES_TYPE__TMQ_META,
};
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
typedef struct SAppInstInfo SAppInstInfo;
......@@ -66,9 +68,9 @@ typedef struct {
int64_t reportBytes; // not implemented
int64_t startTime;
// ctl
SRWLatch lock; // lock is used in serialization
SRWLatch lock; // lock is used in serialization
SAppInstInfo* pAppInstInfo;
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
} SAppHbMgr;
typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
......@@ -76,13 +78,13 @@ typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req);
typedef struct {
int8_t inited;
int64_t appId;
int8_t inited;
int64_t appId;
// ctl
int8_t threadStop;
TdThread thread;
TdThreadMutex lock; // used when app init and cleanup
SHashObj *appSummary;
TdThreadMutex lock; // used when app init and cleanup
SHashObj* appSummary;
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbReqHandle reqHandle[CONN_TYPE__MAX];
FHbRspHandle rspHandle[CONN_TYPE__MAX];
......@@ -129,7 +131,7 @@ typedef struct STscObj {
int8_t connType;
int32_t acctId;
uint32_t connId;
TAOS *id; // ref ID returned by taosAddRef
TAOS* id; // ref ID returned by taosAddRef
TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo* pAppInfo;
......@@ -188,6 +190,14 @@ typedef struct {
SReqResultInfo resInfo;
} SMqRspObj;
typedef struct {
int8_t resType;
char topic[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int32_t vgId;
SMqMetaRsp metaRsp;
} SMqMetaRspObj;
typedef struct SRequestObj {
int8_t resType; // query or tmq
uint64_t requestId;
......@@ -206,9 +216,9 @@ typedef struct SRequestObj {
SRequestSendRecvBody body;
bool stableQuery;
bool killed;
uint32_t prevCode; //previous error code: todo refactor, add update flag for catalog
uint32_t retry;
bool killed;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry;
} SRequestObj;
typedef struct SSyncQueryParam {
......@@ -216,15 +226,15 @@ typedef struct SSyncQueryParam {
SRequestObj* pRequest;
} SSyncQueryParam;
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
bool freeAfterUse);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
bool freeAfterUse);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen);
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
......@@ -289,7 +299,7 @@ bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType);
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType);
uint16_t port, int connType);
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
......@@ -299,7 +309,7 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
void taos_close_internal(void *taos);
void taos_close_internal(void* taos);
// --- heartbeat
// global, called by mgmt
......@@ -320,12 +330,12 @@ void hbMgrInitMqHbRspHandle();
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultMeta);
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta);
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta);
int32_t removeMeta(STscObj* pTscObj, SArray* tbList);// todo move to clientImpl.c and become a static function
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);// todo move to xxx
int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clientImpl.c and become a static function
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx
bool qnodeRequired(SRequestObj* pRequest);
#ifdef __cplusplus
......
......@@ -17,6 +17,7 @@
#include "clientInt.h"
#include "clientLog.h"
#include "clientStmt.h"
#include "functionMgt.h"
#include "os.h"
#include "query.h"
#include "scheduler.h"
......@@ -25,7 +26,6 @@
#include "tref.h"
#include "trpc.h"
#include "version.h"
#include "functionMgt.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
......@@ -97,11 +97,11 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
pass = TSDB_DEFAULT_PASS;
}
STscObj* pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
STscObj *pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
if (pObj) {
return pObj->id;
}
return NULL;
}
......@@ -111,41 +111,40 @@ void taos_close_internal(void *taos) {
}
STscObj *pTscObj = (STscObj *)taos;
tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", *(int64_t*)pTscObj->id, pTscObj->numOfReqs);
tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", *(int64_t *)pTscObj->id, pTscObj->numOfReqs);
taosRemoveRef(clientConnRefPool, *(int64_t*)pTscObj->id);
taosRemoveRef(clientConnRefPool, *(int64_t *)pTscObj->id);
}
void taos_close(TAOS *taos) {
if (taos == NULL) {
return;
}
STscObj* pObj = acquireTscObj(*(int64_t*)taos);
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pObj) {
return;
}
taos_close_internal(pObj);
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
taosMemoryFree(taos);
}
int taos_errno(TAOS_RES *tres) {
if (tres == NULL) {
int taos_errno(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return terrno;
}
if (TD_RES_TMQ(tres)) {
if (TD_RES_TMQ(res)) {
return 0;
}
return ((SRequestObj *)tres)->code;
return ((SRequestObj *)res)->code;
}
const char *taos_errstr(TAOS_RES *res) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return (const char *)tstrerror(terrno);
}
......@@ -179,11 +178,15 @@ void taos_free_result(TAOS_RES *res) {
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
pRsp->resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&pRsp->resInfo);
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res;
taosMemoryFree(pRspObj->metaRsp.metaRsp);
taosMemoryFree(pRspObj);
}
}
int taos_field_count(TAOS_RES *res) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return 0;
}
......@@ -194,7 +197,7 @@ int taos_field_count(TAOS_RES *res) {
int taos_num_fields(TAOS_RES *res) { return taos_field_count(res); }
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
if (taos_num_fields(res) == 0) {
if (taos_num_fields(res) == 0 || TD_RES_TMQ_META(res)) {
return NULL;
}
......@@ -215,8 +218,8 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL || sql == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
......@@ -229,21 +232,21 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
taos_query_a(taos, sql, syncQueryFn, param);
tsem_wait(&param->sem);
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
return param->pRequest;
#else
size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL;
}
TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen);
TAOS_RES *pRes = execQuery(pTscObj, sql, sqlLen);
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
return pRes;
#endif
......@@ -380,7 +383,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
}
int *taos_fetch_lengths(TAOS_RES *res) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return NULL;
}
......@@ -389,7 +392,7 @@ int *taos_fetch_lengths(TAOS_RES *res) {
}
TAOS_ROW *taos_result_block(TAOS_RES *res) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
......@@ -438,7 +441,7 @@ const char *taos_data_type(int type) {
const char *taos_get_client_info() { return version; }
int taos_affected_rows(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res)) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res)) {
return 0;
}
......@@ -448,7 +451,7 @@ int taos_affected_rows(TAOS_RES *res) {
}
int taos_result_precision(TAOS_RES *res) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return TSDB_TIME_PRECISION_MILLI;
}
......@@ -463,15 +466,15 @@ int taos_result_precision(TAOS_RES *res) {
}
int taos_select_db(TAOS *taos, const char *db) {
STscObj* pObj = acquireTscObj(*(int64_t*)taos);
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
if (pObj == NULL) {
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return TSDB_CODE_TSC_DISCONNECTED;
}
if (db == NULL || strlen(db) == 0) {
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
return terrno;
}
......@@ -483,19 +486,19 @@ int taos_select_db(TAOS *taos, const char *db) {
int32_t code = taos_errno(pRequest);
taos_free_result(pRequest);
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
return code;
}
void taos_stop_query(TAOS_RES *res) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res)) {
return;
}
SRequestObj *pRequest = (SRequestObj *)res;
pRequest->killed = true;
int32_t numOfFields = taos_num_fields(pRequest);
int32_t numOfFields = taos_num_fields(pRequest);
// It is not a query, no need to stop.
if (numOfFields == 0) {
tscDebug("request %" PRIx64 " no need to be killed since not query", pRequest->requestId);
......@@ -510,6 +513,9 @@ void taos_stop_query(TAOS_RES *res) {
}
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return true;
}
SReqResultInfo *pResultInfo = tscGetCurResInfo(res);
if (col >= pResultInfo->numOfCols || col < 0 || row >= pResultInfo->numOfRows || row < 0) {
return true;
......@@ -532,7 +538,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
}
int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return 0;
}
......@@ -575,7 +581,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
}
int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return 0;
}
......@@ -615,7 +621,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
}
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return 0;
}
......@@ -636,7 +642,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
void taos_reset_current_db(TAOS *taos) {
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return;
......@@ -644,17 +650,17 @@ void taos_reset_current_db(TAOS *taos) {
resetConnectDB(pTscObj);
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
}
const char *taos_get_server_info(TAOS *taos) {
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
return pTscObj->ver;
}
......@@ -682,7 +688,7 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
tscDebug("enter meta callback, code %s", tstrerror(code));
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
SQuery *pQuery = pWrapper->pQuery;
SRequestObj *pRequest = pWrapper->pRequest;
......@@ -723,11 +729,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
}
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL || sql == NULL || NULL == fp) {
terrno = TSDB_CODE_INVALID_PARA;
if (pTscObj) {
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
} else {
terrno = TSDB_CODE_TSC_DISCONNECTED;
}
......@@ -745,7 +751,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
}
SRequestObj *pRequest = NULL;
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
fp(param, NULL, terrno);
......@@ -849,8 +855,8 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->requestId);
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
tstrerror(code), pRequest->requestId);
pResultInfo->pData = pResult;
pResultInfo->numOfRows = 0;
......@@ -884,6 +890,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT(res != NULL && fp != NULL);
ASSERT(TD_RES_QUERY(res));
SRequestObj *pRequest = res;
pRequest->body.fetchFp = fp;
......@@ -909,6 +916,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT(res != NULL && fp != NULL);
ASSERT(TD_RES_QUERY(res));
SRequestObj *pRequest = res;
pRequest->body.resInfo.convertUcs4 = false;
......@@ -922,6 +930,7 @@ void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
const void *taos_get_raw_block(TAOS_RES *res) {
ASSERT(res != NULL);
ASSERT(TD_RES_QUERY(res));
SRequestObj *pRequest = res;
return pRequest->body.resInfo.pData;
......@@ -948,16 +957,16 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
}
TAOS_STMT *taos_stmt_init(TAOS *taos) {
STscObj* pObj = acquireTscObj(*(int64_t*)taos);
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pObj) {
tscError("invalid parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
TAOS_STMT* pStmt = stmtInit(pObj);
releaseTscObj(*(int64_t*)taos);
TAOS_STMT *pStmt = stmtInit(pObj);
releaseTscObj(*(int64_t *)taos);
return pStmt;
}
......
......@@ -149,7 +149,10 @@ typedef struct {
int32_t epoch;
SMqClientVg* vgHandle;
SMqClientTopic* topicHandle;
SMqDataBlkRsp msg;
union {
SMqDataBlkRsp dataRsp;
SMqMetaRsp metaRsp;
};
} SMqPollRspWrapper;
typedef struct {
......@@ -1131,6 +1134,11 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
}
// handle meta rsp
int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
}
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) {
taosMemoryFree(pMsg->pData);
......@@ -1138,17 +1146,23 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
goto CREATE_MSG_FAIL;
}
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
pRspWrapper->tmqRspType = rspType;
pRspWrapper->vgHandle = pVg;
pRspWrapper->topicHandle = pTopic;
memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);
} else {
ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
}
tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
taosMemoryFree(pMsg->pData);
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset);
taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem);
......@@ -1516,6 +1530,17 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
return pReq;
}
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
pRspObj->resType = RES_TYPE__TMQ;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
return pRspObj;
}
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
......@@ -1523,11 +1548,11 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataBlkRsp));
pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
if (!pWrapper->msg.withSchema) {
if (!pWrapper->dataRsp.withSchema) {
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
}
......@@ -1643,12 +1668,12 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->msg.head.epoch == consumerEpoch) {
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->msg.rspOffset;
pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->msg.blockNum == 0) {
if (pollRspWrapper->dataRsp.blockNum == 0) {
taosFreeQitem(pollRspWrapper);
rspWrapper = NULL;
continue;
......@@ -1658,8 +1683,25 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", pollRspWrapper->msg.head.epoch,
consumerEpoch);
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
taosFreeQitem(pollRspWrapper);
}
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// build rsp
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
taosFreeQitem(pollRspWrapper);
}
} else {
......@@ -1747,10 +1789,23 @@ const char* tmq_err2str(int32_t err) {
}
}
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
return TMQ_RES_DATA;
} else if (TD_RES_TMQ_META(res)) {
return TMQ_RES_TABLE_META;
} else {
return TMQ_RES_INVALID;
}
}
const char* tmq_get_topic_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return strchr(pRspObj->topic, '.') + 1;
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return strchr(pMetaRspObj->topic, '.') + 1;
} else {
return NULL;
}
......@@ -1760,6 +1815,9 @@ const char* tmq_get_db_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return strchr(pRspObj->db, '.') + 1;
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return strchr(pMetaRspObj->db, '.') + 1;
} else {
return NULL;
}
......@@ -1769,6 +1827,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return pRspObj->vgId;
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return pMetaRspObj->vgId;
} else {
return -1;
}
......@@ -1786,6 +1847,16 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return NULL;
}
int32_t tmq_get_raw_meta(TAOS_RES* res, const void** raw_meta, int32_t* raw_meta_len) {
if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
*raw_meta = pMetaRspObj->metaRsp.metaRsp;
*raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
return 0;
}
return -1;
}
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
tmqCommitInner2(tmq, msg, 0, 1, cb, param);
}
......
......@@ -2956,6 +2956,7 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeI8(&encoder, pReq->subType) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withMeta) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->subDbName) < 0) return -1;
if (TOPIC_SUB_TYPE__DB == pReq->subType) {
} else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
......@@ -2985,6 +2986,7 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->subType) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->withMeta) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->subDbName) < 0) return -1;
if (TOPIC_SUB_TYPE__DB == pReq->subType) {
} else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
......
......@@ -420,7 +420,8 @@ typedef struct {
int64_t uid;
int64_t dbUid;
int32_t version;
int8_t subType; // column, db or stable
int8_t subType; // column, db or stable
int8_t withMeta; // TODO
SRWLatch lock;
int32_t sqlLen;
int32_t astLen;
......@@ -487,6 +488,7 @@ typedef struct {
int64_t dbUid;
int32_t vgNum;
int8_t subType;
int8_t withMeta;
int64_t stbUid;
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
SArray* unassignedVgs; // SArray<SMqVgEp*>
......
......@@ -381,6 +381,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
pSubNew->dbUid = pSub->dbUid;
pSubNew->stbUid = pSub->stbUid;
pSubNew->subType = pSub->subType;
pSubNew->withMeta = pSub->withMeta;
pSubNew->vgNum = pSub->vgNum;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
......@@ -414,6 +415,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen += taosEncodeFixedI64(buf, pSub->dbUid);
tlen += taosEncodeFixedI32(buf, pSub->vgNum);
tlen += taosEncodeFixedI8(buf, pSub->subType);
tlen += taosEncodeFixedI8(buf, pSub->withMeta);
tlen += taosEncodeFixedI64(buf, pSub->stbUid);
void *pIter = NULL;
......@@ -440,6 +442,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
buf = taosDecodeFixedI64(buf, &pSub->dbUid);
buf = taosDecodeFixedI32(buf, &pSub->vgNum);
buf = taosDecodeFixedI8(buf, &pSub->subType);
buf = taosDecodeFixedI8(buf, &pSub->withMeta);
buf = taosDecodeFixedI64(buf, &pSub->stbUid);
int32_t sz;
......
......@@ -96,6 +96,7 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
pSub->dbUid = pTopic->dbUid;
pSub->stbUid = pTopic->stbUid;
pSub->subType = pTopic->subType;
pSub->withMeta = pTopic->withMeta;
ASSERT(pSub->unassignedVgs->size == 0);
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
......@@ -120,6 +121,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
req.vgId = pRebVg->pVgEp->vgId;
req.qmsg = pRebVg->pVgEp->qmsg;
req.subType = pSub->subType;
req.withMeta = pSub->withMeta;
req.suid = pSub->stbUid;
strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
......
......@@ -141,6 +141,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->withMeta, TOPIC_ENCODE_OVER);
SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
......@@ -208,6 +209,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->withMeta, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
......@@ -357,6 +359,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.sql = strdup(pCreate->sql);
topicObj.sqlLen = strlen(pCreate->sql) + 1;
topicObj.subType = pCreate->subType;
topicObj.withMeta = pCreate->withMeta;
if (topicObj.withMeta) {
ASSERT(topicObj.subType != TOPIC_SUB_TYPE__COLUMN);
}
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
topicObj.ast = strdup(pCreate->ast);
......
......@@ -114,6 +114,7 @@ typedef struct {
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int64_t consumerId;
int32_t epoch;
int8_t fetchMeta;
// reader
SWalReadHandle* pWalReader;
......
......@@ -85,6 +85,34 @@ void tqClose(STQ* pTq) {
}
}
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqMetaRsp(NULL, pRsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_META_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = pReq->consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqMetaRsp(&abuf, pRsp);
SRpcMsg resp = {
.info = pMsg->info,
.pCont = buf,
.contLen = tlen,
.code = 0,
};
tmsgSendRsp(&resp);
tqDebug("vg %d from consumer %ld (epoch %d) send rsp, res msg type %d, reqOffset: %ld, rspOffset: %ld",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->reqOffset, pRsp->rspOffset);
return 0;
}
int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp) {
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, pRsp);
void* buf = rpcMallocCont(tlen);
......@@ -250,8 +278,23 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
/*ASSERT(0);*/
}
} else {
// TODO
ASSERT(0);
ASSERT(pHandle->fetchMeta);
ASSERT(pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB ||
pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE ||
pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE ||
pHead->msgType == TDMT_VND_DROP_TTL_TABLE);
// return
SMqMetaRsp metaRsp = {0};
metaRsp.reqOffset = pReq->currentOffset;
metaRsp.rspOffset = fetchOffset;
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
code = -1;
}
code = 0;
goto OVER;
}
// TODO batch optimization:
......@@ -276,7 +319,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {
code = -1;
}
OVER:
// TODO wrap in destroy func
taosArrayDestroy(rsp.blockDataLen);
taosArrayDestroyP(rsp.blockData, (FDelete)taosMemoryFree);
......@@ -490,7 +533,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
streamProcessRunReq(pTask);
......@@ -507,7 +550,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
SRpcMsg rsp = {
......@@ -522,7 +565,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
streamProcessRecoverReq(pTask, pReq, pMsg);
......@@ -533,7 +576,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
streamProcessDispatchRsp(pTask, pRsp);
......@@ -544,7 +587,7 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
streamProcessRecoverRsp(pTask, pRsp);
......
......@@ -42,6 +42,25 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
code = 0;
goto END;
} else {
if (pHandle->fetchMeta) {
SWalReadHead* pHead = &((*ppHeadWithCkSum)->head);
if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB ||
pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE ||
pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE ||
pHead->msgType == TDMT_VND_DROP_TTL_TABLE) {
code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum);
if (code < 0) {
ASSERT(0);
*fetchOffset = offset;
code = -1;
goto END;
}
*fetchOffset = offset;
code = 0;
goto END;
}
}
code = walSkipFetchBody(pHandle->pWalReader, *ppHeadWithCkSum);
if (code < 0) {
ASSERT(0);
......
......@@ -82,12 +82,17 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
return NULL;
}
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitRefDec((SStreamDataSubmit*)data);
if (((SStreamQueueItem*)data)->type == STREAM_INPUT__TRIGGER) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data);
} else {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(data);
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data);
} else {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(data);
}
}
streamQueueProcessSuccess(pTask->inputQueue);
return taosArrayInit(0, sizeof(SSDataBlock));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册