未验证 提交 244b552f 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10076 from taosdata/feature/tq

mq support stable subscribe
...@@ -204,12 +204,19 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); ...@@ -204,12 +204,19 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
/* --------------------------TMQ INTERFACE------------------------------- */
typedef struct tmq_resp_err_t tmq_resp_err_t;
typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t;
typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));
DLL_EXPORT tmq_list_t* tmq_list_new(); DLL_EXPORT tmq_list_t* tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*); DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*);
DLL_EXPORT tmq_conf_t* tmq_conf_new(); DLL_EXPORT tmq_conf_t* tmq_conf_new();
DLL_EXPORT int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value); DLL_EXPORT int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb);
DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen);
...@@ -217,13 +224,8 @@ DLL_EXPORT tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, ...@@ -217,13 +224,8 @@ DLL_EXPORT tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr,
DLL_EXPORT TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list); DLL_EXPORT TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list);
DLL_EXPORT tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time); DLL_EXPORT tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time);
DLL_EXPORT tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async);
DLL_EXPORT int32_t tmq_topic_num(tmq_message_t* msg);
DLL_EXPORT char* tmq_get_topic(tmq_message_topic_t* msg);
DLL_EXPORT int32_t tmq_get_vgId(tmq_message_topic_t* msg);
DLL_EXPORT tmq_message_tb_t* tmq_get_next_tb(tmq_message_topic_t* msg, tmq_tb_iter_t* iter);
DLL_EXPORT tmq_message_col_t* tmq_get_next_col(tmq_message_tb_t* msg, tmq_col_iter_t* iter);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -38,6 +38,10 @@ ...@@ -38,6 +38,10 @@
// int16_t bytes; // int16_t bytes;
//} SSchema; //} SSchema;
#define TMQ_REQ_TYPE_COMMIT_ONLY 0
#define TMQ_REQ_TYPE_CONSUME_ONLY 1
#define TMQ_REQ_TYPE_CONSUME_AND_COMMIT 2
typedef struct { typedef struct {
uint32_t numOfTables; uint32_t numOfTables;
SArray *pGroupList; SArray *pGroupList;
......
...@@ -27,6 +27,10 @@ typedef void* DataSinkHandle; ...@@ -27,6 +27,10 @@ typedef void* DataSinkHandle;
struct SRpcMsg; struct SRpcMsg;
struct SSubplan; struct SSubplan;
typedef struct SReadHandle {
void* reader;
void* meta;
} SReadHandle;
/** /**
* Create the exec task for streaming mode * Create the exec task for streaming mode
* @param pMsg * @param pMsg
...@@ -35,7 +39,13 @@ struct SSubplan; ...@@ -35,7 +39,13 @@ struct SSubplan;
*/ */
qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle); qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle);
int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input); /**
*
* @param tinfo
* @param input
* @return
*/
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input);
/** /**
* Create the exec task object according to task json * Create the exec task object according to task json
...@@ -46,7 +56,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input); ...@@ -46,7 +56,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input);
* @param qId * @param qId
* @return * @return
*/ */
int32_t qCreateExecTask(void* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
/** /**
* The main task execution function, including query on both table and multiple tables, * The main task execution function, including query on both table and multiple tables,
......
...@@ -107,14 +107,14 @@ typedef struct SPoint1 { ...@@ -107,14 +107,14 @@ typedef struct SPoint1 {
union{double val; char* ptr;}; union{double val; char* ptr;};
} SPoint1; } SPoint1;
struct SQLFunctionCtx; struct SqlFunctionCtx;
struct SResultRowEntryInfo; struct SResultRowEntryInfo;
//for selectivity query, the corresponding tag value is assigned if the data is qualified //for selectivity query, the corresponding tag value is assigned if the data is qualified
typedef struct SExtTagsInfo { typedef struct SExtTagsInfo {
int16_t tagsLen; // keep the tags data for top/bottom query result int16_t tagsLen; // keep the tags data for top/bottom query result
int16_t numOfTagCols; int16_t numOfTagCols;
struct SQLFunctionCtx **pTagCtxList; struct SqlFunctionCtx **pTagCtxList;
} SExtTagsInfo; } SExtTagsInfo;
typedef struct SResultDataInfo { typedef struct SResultDataInfo {
...@@ -126,18 +126,18 @@ typedef struct SResultDataInfo { ...@@ -126,18 +126,18 @@ typedef struct SResultDataInfo {
#define GET_RES_INFO(ctx) ((ctx)->resultInfo) #define GET_RES_INFO(ctx) ((ctx)->resultInfo)
typedef struct SFunctionFpSet { typedef struct SFunctionFpSet {
bool (*init)(struct SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment bool (*init)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment
void (*addInput)(struct SQLFunctionCtx *pCtx); void (*addInput)(struct SqlFunctionCtx *pCtx);
// finalizer must be called after all exec has been executed to generated final result. // finalizer must be called after all exec has been executed to generated final result.
void (*finalize)(struct SQLFunctionCtx *pCtx); void (*finalize)(struct SqlFunctionCtx *pCtx);
void (*combine)(struct SQLFunctionCtx *pCtx); void (*combine)(struct SqlFunctionCtx *pCtx);
} SFunctionFpSet; } SFunctionFpSet;
extern SFunctionFpSet fpSet[1]; extern SFunctionFpSet fpSet[1];
// sql function runtime context // sql function runtime context
typedef struct SQLFunctionCtx { typedef struct SqlFunctionCtx {
int32_t size; // number of rows int32_t size; // number of rows
void * pInput; // input data buffer void * pInput; // input data buffer
uint32_t order; // asc|desc uint32_t order; // asc|desc
...@@ -167,7 +167,7 @@ typedef struct SQLFunctionCtx { ...@@ -167,7 +167,7 @@ typedef struct SQLFunctionCtx {
int32_t columnIndex; int32_t columnIndex;
SFunctionFpSet* fpSet; SFunctionFpSet* fpSet;
} SQLFunctionCtx; } SqlFunctionCtx;
enum { enum {
TEXPR_NODE_DUMMY = 0x0, TEXPR_NODE_DUMMY = 0x0,
...@@ -216,14 +216,14 @@ typedef struct SAggFunctionInfo { ...@@ -216,14 +216,14 @@ typedef struct SAggFunctionInfo {
int8_t sFunctionId; // Transfer function for super table query int8_t sFunctionId; // Transfer function for super table query
uint16_t status; uint16_t status;
bool (*init)(SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment bool (*init)(SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment
void (*addInput)(SQLFunctionCtx *pCtx); void (*addInput)(SqlFunctionCtx *pCtx);
// finalizer must be called after all exec has been executed to generated final result. // finalizer must be called after all exec has been executed to generated final result.
void (*finalize)(SQLFunctionCtx *pCtx); void (*finalize)(SqlFunctionCtx *pCtx);
void (*combine)(SQLFunctionCtx *pCtx); void (*combine)(SqlFunctionCtx *pCtx);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId); int32_t (*dataReqFunc)(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
} SAggFunctionInfo; } SAggFunctionInfo;
struct SScalarFuncParam; struct SScalarFuncParam;
...@@ -279,9 +279,9 @@ void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc); ...@@ -279,9 +279,9 @@ void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc);
tExprNode* exprdup(tExprNode* pTree); tExprNode* exprdup(tExprNode* pTree);
void resetResultRowEntryResult(SQLFunctionCtx* pCtx, int32_t num); void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell); void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
int32_t getNumOfResult(SQLFunctionCtx* pCtx, int32_t num); int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num);
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry); bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry);
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry); bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry);
......
...@@ -22,7 +22,7 @@ extern "C" { ...@@ -22,7 +22,7 @@ extern "C" {
#include "nodes.h" #include "nodes.h"
struct SQLFunctionCtx; struct SqlFunctionCtx;
struct SResultRowEntryInfo; struct SResultRowEntryInfo;
struct STimeWindow; struct STimeWindow;
...@@ -32,9 +32,9 @@ typedef struct SFuncExecEnv { ...@@ -32,9 +32,9 @@ typedef struct SFuncExecEnv {
typedef void* FuncMgtHandle; typedef void* FuncMgtHandle;
typedef bool (*FExecGetEnv)(SFunctionNode* pFunc, SFuncExecEnv* pEnv); typedef bool (*FExecGetEnv)(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
typedef bool (*FExecInit)(struct SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
typedef void (*FExecProcess)(struct SQLFunctionCtx *pCtx); typedef void (*FExecProcess)(struct SqlFunctionCtx *pCtx);
typedef void (*FExecFinalize)(struct SQLFunctionCtx *pCtx); typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx);
typedef struct SFuncExecFuncs { typedef struct SFuncExecFuncs {
FExecGetEnv getEnv; FExecGetEnv getEnv;
......
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
#endif #endif
OP_ENUM_MACRO(StreamScan) OP_ENUM_MACRO(StreamScan)
OP_ENUM_MACRO(TableScan)
OP_ENUM_MACRO(DataBlocksOptScan) OP_ENUM_MACRO(DataBlocksOptScan)
OP_ENUM_MACRO(TableSeqScan) OP_ENUM_MACRO(TableSeqScan)
OP_ENUM_MACRO(TagScan) OP_ENUM_MACRO(TagScan)
...@@ -48,3 +47,5 @@ OP_ENUM_MACRO(AllTimeWindow) ...@@ -48,3 +47,5 @@ OP_ENUM_MACRO(AllTimeWindow)
OP_ENUM_MACRO(AllMultiTableTimeInterval) OP_ENUM_MACRO(AllMultiTableTimeInterval)
OP_ENUM_MACRO(Order) OP_ENUM_MACRO(Order)
OP_ENUM_MACRO(Exchange) OP_ENUM_MACRO(Exchange)
//OP_ENUM_MACRO(TableScan)
...@@ -254,6 +254,7 @@ int32_t* taosGetErrno(); ...@@ -254,6 +254,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03E5) #define TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03E5)
#define TSDB_CODE_MND_NAME_CONFLICT_WITH_STB TAOS_DEF_ERROR_CODE(0, 0x03E6) #define TSDB_CODE_MND_NAME_CONFLICT_WITH_STB TAOS_DEF_ERROR_CODE(0, 0x03E6)
#define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7) #define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7)
#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E7)
// dnode // dnode
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400) #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
......
#include "../../libs/scheduler/inc/schedulerInt.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "parser.h" #include "parser.h"
...@@ -22,6 +21,93 @@ ...@@ -22,6 +21,93 @@
} \ } \
} while (0) } while (0)
typedef struct SMqClientVg {
// statistics
int64_t pollCnt;
// offset
int64_t committedOffset;
int64_t currentOffset;
//connection info
int32_t vgId;
SEpSet epSet;
} SMqClientVg;
typedef struct SMqClientTopic {
// subscribe info
int32_t sqlLen;
char* sql;
char* topicName;
int64_t topicId;
int32_t nextVgIdx;
SArray* vgs; //SArray<SMqClientVg>
} SMqClientTopic;
struct tmq_resp_err_t {
int32_t code;
};
struct tmq_topic_vgroup_t {
char* topic;
int32_t vgId;
int64_t commitOffset;
};
struct tmq_topic_vgroup_list_t {
int32_t cnt;
int32_t size;
tmq_topic_vgroup_t* elems;
};
typedef struct SMqConsumeCbParam {
tmq_t* tmq;
SMqClientVg* pVg;
tmq_message_t** retMsg;
} SMqConsumeCbParam;
struct tmq_conf_t {
char clientId[256];
char groupId[256];
char* ip;
uint16_t port;
tmq_commit_cb* commit_cb;
};
struct tmq_message_t {
SMqConsumeRsp rsp;
};
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
return conf;
}
int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
if (strcmp(key, "group.id") == 0) {
strcpy(conf->groupId, value);
}
if (strcmp(key, "client.id") == 0) {
strcpy(conf->clientId, value);
}
return 0;
}
struct tmq_t {
char groupId[256];
char clientId[256];
SRWLatch lock;
int64_t consumerId;
int64_t epoch;
int64_t status;
tsem_t rspSem;
STscObj* pTscObj;
tmq_commit_cb* commit_cb;
int32_t nextTopicIdx;
SArray* clientTopics; //SArray<SMqClientTopic>
//stat
int64_t pollCnt;
};
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
...@@ -260,83 +346,6 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) ...@@ -260,83 +346,6 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
} }
typedef struct SMqClientVg {
// statistics
int64_t pollCnt;
// offset
int64_t committedOffset;
int64_t currentOffset;
//connection info
int32_t vgId;
SEpSet epSet;
} SMqClientVg;
typedef struct SMqClientTopic {
// subscribe info
int32_t sqlLen;
char* sql;
char* topicName;
int64_t topicId;
int32_t nextVgIdx;
SArray* vgs; //SArray<SMqClientVg>
} SMqClientTopic;
typedef struct tmq_resp_err_t {
int32_t code;
} tmq_resp_err_t;
typedef struct tmq_topic_vgroup_t {
char* topic;
int32_t vgId;
int64_t commitOffset;
} tmq_topic_vgroup_t;
typedef struct tmq_topic_vgroup_list_t {
int32_t cnt;
int32_t size;
tmq_topic_vgroup_t* elems;
} tmq_topic_vgroup_list_t;
typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));
struct tmq_conf_t {
char clientId[256];
char groupId[256];
char* ip;
uint16_t port;
tmq_commit_cb* commit_cb;
};
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
return conf;
}
int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
if (strcmp(key, "group.id") == 0) {
strcpy(conf->groupId, value);
}
if (strcmp(key, "client.id") == 0) {
strcpy(conf->clientId, value);
}
return 0;
}
struct tmq_t {
char groupId[256];
char clientId[256];
SRWLatch lock;
int64_t consumerId;
int64_t epoch;
int64_t status;
tsem_t rspSem;
STscObj* pTscObj;
tmq_commit_cb* commit_cb;
int32_t nextTopicIdx;
SArray* clientTopics; //SArray<SMqClientTopic>
//stat
int64_t pollCnt;
};
tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = calloc(sizeof(tmq_t), 1); tmq_t* pTmq = calloc(sizeof(tmq_t), 1);
...@@ -583,7 +592,7 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, ...@@ -583,7 +592,7 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
int tlen = tSerializeSCMCreateTopicReq(NULL, &req); int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
void* buf = malloc(tlen); void* buf = malloc(tlen);
if(buf == NULL) { if (buf == NULL) {
goto _return; goto _return;
} }
...@@ -615,18 +624,64 @@ _return: ...@@ -615,18 +624,64 @@ _return:
return pRequest; return pRequest;
} }
/*typedef SMqConsumeRsp tmq_message_t;*/ static char *formatTimestamp(char *buf, int64_t val, int precision) {
time_t tt;
int32_t ms = 0;
if (precision == TSDB_TIME_PRECISION_NANO) {
tt = (time_t)(val / 1000000000);
ms = val % 1000000000;
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
tt = (time_t)(val / 1000000);
ms = val % 1000000;
} else {
tt = (time_t)(val / 1000);
ms = val % 1000;
}
struct tmq_message_t { /* comment out as it make testcases like select_with_tags.sim fail.
SMqConsumeRsp rsp; but in windows, this may cause the call to localtime crash if tt < 0,
}; need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if (tt < 0) tt = 0;
#endif
if (tt <= 0 && ms < 0) {
tt--;
if (precision == TSDB_TIME_PRECISION_NANO) {
ms += 1000000000;
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
ms += 1000000;
} else {
ms += 1000;
}
}
struct tm *ptm = localtime(&tt);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", ms);
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
sprintf(buf + pos, ".%06d", ms);
} else {
sprintf(buf + pos, ".%03d", ms);
}
return buf;
}
int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
if (code == -1) { if (code == -1) {
printf("discard\n"); printf("msg discard\n");
return 0; return 0;
} }
SMqClientVg* pVg = (SMqClientVg*)param; char pBuf[128];
SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param;
SMqClientVg* pVg = pParam->pVg;
SMqConsumeRsp rsp; SMqConsumeRsp rsp;
tDecodeSMqConsumeRsp(pMsg->pData, &rsp); tDecodeSMqConsumeRsp(pMsg->pData, &rsp);
if (rsp.numOfTopics == 0) { if (rsp.numOfTopics == 0) {
...@@ -639,10 +694,11 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -639,10 +694,11 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
/*printf("-----msg begin----\n");*/ /*printf("-----msg begin----\n");*/
printf("|"); printf("|");
for (int32_t i = 0; i < colNum; i++) { for (int32_t i = 0; i < colNum; i++) {
printf(" %15s |", rsp.schemas->pSchema[i].name); if (i == 0) printf(" %25s |", rsp.schemas->pSchema[i].name);
else printf(" %15s |", rsp.schemas->pSchema[i].name);
} }
printf("\n"); printf("\n");
printf("=====================================\n"); printf("===============================================\n");
int32_t sz = taosArrayGetSize(rsp.pBlockData); int32_t sz = taosArrayGetSize(rsp.pBlockData);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(rsp.pBlockData, i); SSDataBlock* pDataBlock = taosArrayGet(rsp.pBlockData, i);
...@@ -654,7 +710,8 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -654,7 +710,8 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch(pColInfoData->info.type) { switch(pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
printf(" %15lu |", *(uint64_t*)var); formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
printf(" %25s |", pBuf);
break; break;
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UINT:
...@@ -678,8 +735,7 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -678,8 +735,7 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = pParam->tmq; tmq_t* tmq = pParam->tmq;
if (code != 0) { if (code != 0) {
printf("get topic endpoint error, not ready, wait:%d\n", pParam->wait);
printf("exit wait %d\n", pParam->wait);
if (pParam->wait) { if (pParam->wait) {
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
} }
...@@ -770,47 +826,70 @@ END: ...@@ -770,47 +826,70 @@ END:
return 0; return 0;
} }
tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, int32_t type, SMqClientTopic* pTopic, SMqClientVg** ppVg) {
SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq));
if (pReq == NULL) {
return NULL;
}
pReq->reqType = type;
pReq->blockingTime = blocking_time;
pReq->consumerId = tmq->consumerId;
strcpy(pReq->cgroup, tmq->groupId);
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
strcpy(pReq->topic, pTopic->topicName);
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1 % taosArrayGetSize(pTopic->vgs));
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
pReq->offset = pVg->currentOffset+1;
*ppVg = pVg;
pReq->head.vgId = htonl(pVg->vgId);
pReq->head.contLen = htonl(sizeof(SMqConsumeReq));
return pReq;
}
tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* tmq_message = NULL;
int64_t status = atomic_load_64(&tmq->status); int64_t status = atomic_load_64(&tmq->status);
tmqAsyncAskEp(tmq, status == 0 || taosArrayGetSize(tmq->clientTopics)); tmqAsyncAskEp(tmq, status == 0 || taosArrayGetSize(tmq->clientTopics));
if (blocking_time < 0) blocking_time = 500; /*if (blocking_time < 0) blocking_time = 500;*/
blocking_time = 1000;
if (taosArrayGetSize(tmq->clientTopics) == 0) { if (taosArrayGetSize(tmq->clientTopics) == 0) {
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
usleep(blocking_time * 1000); usleep(blocking_time * 1000);
return NULL; return NULL;
} }
SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq));
pReq->reqType = 1;
pReq->blockingTime = blocking_time;
pReq->consumerId = tmq->consumerId;
tmq_message_t* tmq_message = NULL;
strcpy(pReq->cgroup, tmq->groupId);
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); if (taosArrayGetSize(pTopic->vgs) == 0) {
strcpy(pReq->topic, pTopic->topicName);
int32_t vgSz = taosArrayGetSize(pTopic->vgs);
if (vgSz == 0) {
free(pReq);
usleep(blocking_time * 1000); usleep(blocking_time * 1000);
return NULL; return NULL;
} }
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1 % vgSz);
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
pReq->offset = pVg->currentOffset+1;
pReq->head.vgId = htonl(pVg->vgId); SMqClientVg* pVg = NULL;
pReq->head.contLen = htonl(sizeof(SMqConsumeReq)); SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, TMQ_REQ_TYPE_CONSUME_ONLY, pTopic, &pVg);
if (pReq == NULL) {
usleep(blocking_time * 1000);
return NULL;
}
SMqConsumeCbParam* param = malloc(sizeof(SMqConsumeCbParam));
if (param == NULL) {
usleep(blocking_time * 1000);
return NULL;
}
param->tmq = tmq;
param->retMsg = &tmq_message;
param->pVg = pVg;
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, .len = sizeof(SMqConsumeReq) }; pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, .len = sizeof(SMqConsumeReq) };
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
sendInfo->param = pVg; sendInfo->param = param;
sendInfo->fp = tmq_poll_cb_inner; sendInfo->fp = tmq_poll_cb_inner;
/*printf("req offset: %ld\n", pReq->offset);*/ /*printf("req offset: %ld\n", pReq->offset);*/
......
...@@ -562,9 +562,9 @@ TEST(testCase, insert_test) { ...@@ -562,9 +562,9 @@ TEST(testCase, insert_test) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif #endif
TEST(testCase, create_topic_Test) {
TEST(testCase, create_topic_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
...@@ -583,13 +583,37 @@ TEST(testCase, create_topic_Test) { ...@@ -583,13 +583,37 @@ TEST(testCase, create_topic_Test) {
taos_free_result(pRes); taos_free_result(pRes);
char* sql = "select * from tu"; char* sql = "select * from tu";
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); pRes = taos_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql));
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
TEST(testCase, create_topic_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TEST(testCase, tmq_subscribe_Test) { TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select ts, k from st1";
pRes = taos_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
#if 0
TEST(testCase, tmq_subscribe_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
...@@ -604,11 +628,36 @@ TEST(testCase, tmq_subscribe_Test) { ...@@ -604,11 +628,36 @@ TEST(testCase, tmq_subscribe_Test) {
tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new(); tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_topic_1"); tmq_list_append(topic_list, "test_ctb_topic_1");
tmq_subscribe(tmq, topic_list); tmq_subscribe(tmq, topic_list);
while (1) { while (1) {
tmq_message_t* msg = tmq_consume_poll(tmq, 1000); tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
//printf("get msg\n");
//if (msg == NULL) break;
}
}
TEST(testCase, tmq_subscribe_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
//printf("get msg\n"); //printf("get msg\n");
//if (msg == NULL) break; //if (msg == NULL) break;
} }
...@@ -619,6 +668,7 @@ TEST(testCase, tmq_consume_Test) { ...@@ -619,6 +668,7 @@ TEST(testCase, tmq_consume_Test) {
TEST(testCase, tmq_commit_TEST) { TEST(testCase, tmq_commit_TEST) {
} }
#endif
#if 0 #if 0
TEST(testCase, projection_query_tables) { TEST(testCase, projection_query_tables) {
......
...@@ -96,7 +96,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -96,7 +96,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
bool found = 0; bool found = 0;
bool changed = 0; bool changed = 0;
for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) { for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) {
if (*(int64_t*)taosArrayGet(pSub->availConsumer, j) == consumerId) { if (*(int64_t *)taosArrayGet(pSub->availConsumer, j) == consumerId) {
found = 1; found = 1;
break; break;
} }
...@@ -105,16 +105,13 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -105,16 +105,13 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
taosArrayPush(pSub->availConsumer, &consumerId); taosArrayPush(pSub->availConsumer, &consumerId);
} }
int32_t assignedSz = taosArrayGetSize(pSub->assigned); int32_t assignedSz = taosArrayGetSize(pSub->assigned);
topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp)); topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp));
for (int32_t j = 0; j < assignedSz; j++) { for (int32_t j = 0; j < assignedSz; j++) {
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, j); SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, j);
if (pCEp->consumerId == consumerId) { if (pCEp->consumerId == consumerId) {
pCEp->lastConsumerHbTs = currentTs; pCEp->lastConsumerHbTs = currentTs;
SMqSubVgEp vgEp = { SMqSubVgEp vgEp = {.epSet = pCEp->epSet, .vgId = pCEp->vgId};
.epSet = pCEp->epSet,
.vgId = pCEp->vgId
};
taosArrayPush(topicEp.vgs, &vgEp); taosArrayPush(topicEp.vgs, &vgEp);
changed = 1; changed = 1;
} }
...@@ -123,7 +120,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -123,7 +120,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
taosArrayPush(rsp.topics, &topicEp); taosArrayPush(rsp.topics, &topicEp);
} }
if (changed || found) { if (changed || found) {
SSdbRaw* pRaw = mndSubActionEncode(pSub); SSdbRaw *pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
sdbWriteNotFree(pMnode->pSdb, pRaw); sdbWriteNotFree(pMnode->pSdb, pRaw);
} }
...@@ -137,7 +134,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -137,7 +134,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
} }
void *abuf = buf; void *abuf = buf;
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
//TODO: free rsp // TODO: free rsp
pMsg->pCont = buf; pMsg->pCont = buf;
pMsg->contLen = tlen; pMsg->contLen = tlen;
return 0; return 0;
...@@ -164,9 +161,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -164,9 +161,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
int32_t sz; int32_t sz;
while (pIter != NULL) { while (pIter != NULL) {
for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) { for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
SMqConsumerEp* pCEp = taosArrayGet(pSub->assigned, i); SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
int64_t consumerId = pCEp->consumerId; int64_t consumerId = pCEp->consumerId;
if(pCEp->lastConsumerHbTs != -1 && currentTs - pCEp->lastConsumerHbTs > MND_SUBSCRIBE_REBALANCE_MS) { if (pCEp->lastConsumerHbTs != -1 && currentTs - pCEp->lastConsumerHbTs > MND_SUBSCRIBE_REBALANCE_MS) {
// put consumer into lostConsumer // put consumer into lostConsumer
taosArrayPush(pSub->lostConsumer, pCEp); taosArrayPush(pSub->lostConsumer, pCEp);
// put vg into unassgined // put vg into unassgined
...@@ -176,7 +173,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -176,7 +173,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
taosArrayRemove(pSub->assigned, i); taosArrayRemove(pSub->assigned, i);
// remove from available consumer // remove from available consumer
for (int j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) { for (int j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) {
if (*(int64_t*)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) { if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) {
taosArrayRemove(pSub->availConsumer, j); taosArrayRemove(pSub->availConsumer, j);
break; break;
} }
...@@ -209,7 +206,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -209,7 +206,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
taosArrayPush(pSub->assigned, pCEp); taosArrayPush(pSub->assigned, pCEp);
pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer); pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
pConsumer->epoch++; pConsumer->epoch++;
/*SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);*/ /*SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/ /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
...@@ -269,33 +266,50 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -269,33 +266,50 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
// convert phyplan to dag // convert phyplan to dag
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
SArray *pArray; SArray *pArray = NULL;
SArray *inner = taosArrayGet(pDag->pSubplans, 0); SArray *inner = taosArrayGet(pDag->pSubplans, 0);
SSubplan *plan = taosArrayGetP(inner, 0); SSubplan *plan = taosArrayGetP(inner, 0);
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
plan->execNode.nodeId = 2; void *pIter = NULL;
SEpSet* pEpSet = &plan->execNode.epset; while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pTopic->dbUid) continue;
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup);
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql);
return -1;
}
if (pArray && taosArrayGetSize(pArray) != 1) {
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
mError("unsupport topic: %s, sql: %s, plan level: %ld", pTopic->name, pTopic->sql, taosArrayGetSize(pArray));
return -1;
}
pEpSet->inUse = 0;
addEpIntoEpSet(pEpSet, "localhost", 6030);
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
return -1;
}
int32_t sz = taosArrayGetSize(pArray);
// convert dag to msg
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp CEp; SMqConsumerEp CEp;
CEp.status = 0; CEp.status = 0;
CEp.consumerId = -1; CEp.consumerId = -1;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
STaskInfo *pTaskInfo = taosArrayGet(pArray, i); STaskInfo *pTaskInfo = taosArrayGet(pArray, 0);
CEp.epSet = pTaskInfo->addr.epset; CEp.epSet = pTaskInfo->addr.epset;
/*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1],
* CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
CEp.vgId = pTaskInfo->addr.nodeId; CEp.vgId = pTaskInfo->addr.nodeId;
ASSERT(CEp.vgId == pVgroup->vgId);
CEp.qmsg = strdup(pTaskInfo->msg->msg); CEp.qmsg = strdup(pTaskInfo->msg->msg);
taosArrayPush(unassignedVg, &CEp); taosArrayPush(unassignedVg, &CEp);
//TODO: free taskInfo
taosArrayDestroy(pArray);
/*SEpSet *pEpSet = &plan->execNode.epset;*/
/*pEpSet->inUse = 0;*/
/*addEpIntoEpSet(pEpSet, "localhost", 6030);*/
} }
/*qDestroyQueryDag(pDag);*/ /*qDestroyQueryDag(pDag);*/
...@@ -608,7 +622,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -608,7 +622,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
} }
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName);
bool create = false; bool create = false;
if (pSub == NULL) { if (pSub == NULL) {
mDebug("create new subscription, group: %s, topic %s", consumerGroup, newTopicName); mDebug("create new subscription, group: %s, topic %s", consumerGroup, newTopicName);
pSub = tNewSubscribeObj(); pSub = tNewSubscribeObj();
...@@ -624,7 +638,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -624,7 +638,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
strcpy(pSub->key, key); strcpy(pSub->key, key);
free(key); free(key);
// set unassigned vg // set unassigned vg
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg); if (mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg) < 0) {
//TODO: free memory
return -1;
}
// TODO: disable alter // TODO: disable alter
create = true; create = true;
} }
......
...@@ -103,6 +103,7 @@ static int32_t mndRestoreWal(SMnode *pMnode) { ...@@ -103,6 +103,7 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
if (walEndSnapshot(pWal) < 0) { if (walEndSnapshot(pWal) < 0) {
goto WAL_RESTORE_OVER; goto WAL_RESTORE_OVER;
} }
} }
code = 0; code = 0;
......
...@@ -91,7 +91,7 @@ int tsdbCommit(STsdb *pTsdb); ...@@ -91,7 +91,7 @@ int tsdbCommit(STsdb *pTsdb);
int tsdbOptionsInit(STsdbCfg *); int tsdbOptionsInit(STsdbCfg *);
void tsdbOptionsClear(STsdbCfg *); void tsdbOptionsClear(STsdbCfg *);
typedef void* tsdbReadHandleT; typedef void* tsdbReaderT;
/** /**
* Get the data block iterator, starting from position according to the query condition * Get the data block iterator, starting from position according to the query condition
...@@ -103,7 +103,7 @@ typedef void* tsdbReadHandleT; ...@@ -103,7 +103,7 @@ typedef void* tsdbReadHandleT;
* @param qinfo query info handle from query processor * @param qinfo query info handle from query processor
* @return * @return
*/ */
tsdbReadHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, uint64_t taskId);
/** /**
* Get the last row of the given query time window for all the tables in STableGroupInfo object. * Get the last row of the given query time window for all the tables in STableGroupInfo object.
...@@ -115,13 +115,13 @@ tsdbReadHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroup ...@@ -115,13 +115,13 @@ tsdbReadHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroup
* @param tableInfo table list. * @param tableInfo table list.
* @return * @return
*/ */
//tsdbReadHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId, //tsdbReaderT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
// SMemRef *pRef); // SMemRef *pRef);
tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef); tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef);
bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle); bool isTsdbCacheLastRow(tsdbReaderT* pTsdbReadHandle);
/** /**
* *
...@@ -138,7 +138,7 @@ bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle); ...@@ -138,7 +138,7 @@ bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle);
* @param reqId * @param reqId
* @return * @return
*/ */
int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len, int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo, int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId); SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId);
/** /**
...@@ -148,7 +148,7 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch ...@@ -148,7 +148,7 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
* @return row size * @return row size
*/ */
int64_t tsdbGetNumOfRowsInMemTable(tsdbReadHandleT* pHandle); int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle);
/** /**
* move to next block if exists * move to next block if exists
...@@ -156,7 +156,7 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReadHandleT* pHandle); ...@@ -156,7 +156,7 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReadHandleT* pHandle);
* @param pTsdbReadHandle * @param pTsdbReadHandle
* @return * @return
*/ */
bool tsdbNextDataBlock(tsdbReadHandleT pTsdbReadHandle); bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
/** /**
* Get current data block information * Get current data block information
...@@ -165,7 +165,7 @@ bool tsdbNextDataBlock(tsdbReadHandleT pTsdbReadHandle); ...@@ -165,7 +165,7 @@ bool tsdbNextDataBlock(tsdbReadHandleT pTsdbReadHandle);
* @param pBlockInfo * @param pBlockInfo
* @return * @return
*/ */
void tsdbRetrieveDataBlockInfo(tsdbReadHandleT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
/** /**
* *
...@@ -177,7 +177,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReadHandleT *pTsdbReadHandle, SDataBlockInfo ...@@ -177,7 +177,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReadHandleT *pTsdbReadHandle, SDataBlockInfo
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0 * @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return * @return
*/ */
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReadHandleT *pTsdbReadHandle, SDataStatis **pBlockStatis); int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SDataStatis **pBlockStatis);
/** /**
* *
...@@ -189,7 +189,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReadHandleT *pTsdbReadHandle, SDataS ...@@ -189,7 +189,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReadHandleT *pTsdbReadHandle, SDataS
* @param pColumnIdList required data columns id list * @param pColumnIdList required data columns id list
* @return * @return
*/ */
SArray *tsdbRetrieveDataBlock(tsdbReadHandleT *pTsdbReadHandle, SArray *pColumnIdList); SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
/** /**
* destroy the created table group list, which is generated by tag query * destroy the created table group list, which is generated by tag query
...@@ -205,7 +205,7 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); ...@@ -205,7 +205,7 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
* @param pGroupInfo the generated result * @param pGroupInfo the generated result
* @return * @return
*/ */
int32_t tsdbGetOneTableGroup(STsdb *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
/** /**
* *
...@@ -220,7 +220,7 @@ int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGro ...@@ -220,7 +220,7 @@ int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGro
* clean up the query handle * clean up the query handle
* @param queryHandle * @param queryHandle
*/ */
void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle); void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -72,7 +72,7 @@ typedef struct STqReadHandle { ...@@ -72,7 +72,7 @@ typedef struct STqReadHandle {
int64_t ver; int64_t ver;
uint64_t tbUid; uint64_t tbUid;
SHashObj* tbIdHash; SHashObj* tbIdHash;
SSubmitMsg* pMsg; const SSubmitMsg* pMsg;
SSubmitBlk* pBlock; SSubmitBlk* pBlock;
SSubmitMsgIter msgIter; SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter; SSubmitBlkIter blkIter;
...@@ -208,11 +208,11 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SA ...@@ -208,11 +208,11 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SA
pReadHandle->pColIdList = pColIdList; pReadHandle->pColIdList = pColIdList;
} }
static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, uint64_t tbUid) { //static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, const SArray* pTableIdList) {
pHandle->tbUid = tbUid; //pHandle->tbUid = pTableIdList;
} //}
static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle* pHandle, SArray* tbUidList) { static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) { if (pHandle->tbIdHash == NULL) {
return -1; return -1;
......
...@@ -824,8 +824,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { ...@@ -824,8 +824,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
SReadHandle handle = { .reader = pReadHandle, .meta = pTq->pMeta };
pTopic->buffer.output[i].pReadHandle = pReadHandle; pTopic->buffer.output[i].pReadHandle = pReadHandle;
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
} }
taosArrayPush(pConsumer->topics, pTopic); taosArrayPush(pConsumer->topics, pTopic);
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
...@@ -866,18 +867,16 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { ...@@ -866,18 +867,16 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
if (pHandle->pBlock == NULL) return false; if (pHandle->pBlock == NULL) return false;
pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid); pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);
if (pHandle->tbUid == pHandle->pBlock->uid) { /*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
ASSERT(pHandle->tbIdHash);
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
if (ret != NULL) {
pHandle->pBlock->tid = htonl(pHandle->pBlock->tid); pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);
pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion); pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);
pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen); pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);
pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen); pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);
pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows); pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);
return true; return true;
} else if (pHandle->tbIdHash != NULL) {
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
if (ret != NULL) {
return true;
}
} }
} }
return false; return false;
......
...@@ -159,7 +159,7 @@ static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGro ...@@ -159,7 +159,7 @@ static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGro
static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle); static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
//static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey); //static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey);
static void changeQueryHandleForInterpQuery(tsdbReadHandleT pHandle); static void changeQueryHandleForInterpQuery(tsdbReaderT pHandle);
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock); static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbReadHandle* pTsdbReadHandle); static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbReadHandle* pTsdbReadHandle);
...@@ -167,7 +167,7 @@ static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2); ...@@ -167,7 +167,7 @@ static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
//static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef); //static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef);
//static void* doFreeColumnInfoData(SArray* pColumnInfoData); //static void* doFreeColumnInfoData(SArray* pColumnInfoData);
//static void* destroyTableCheckInfo(SArray* pTableCheckInfo); //static void* destroyTableCheckInfo(SArray* pTableCheckInfo);
static bool tsdbGetExternalRow(tsdbReadHandleT pHandle); static bool tsdbGetExternalRow(tsdbReaderT pHandle);
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
pBlockLoadInfo->slot = -1; pBlockLoadInfo->slot = -1;
...@@ -208,7 +208,7 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load ...@@ -208,7 +208,7 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load
return pLocalIdList; return pLocalIdList;
} }
//int64_t tsdbGetNumOfRowsInMemTable(tsdbReadHandleT* pHandle) { //int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
// STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle; // STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
// //
// int64_t rows = 0; // int64_t rows = 0;
...@@ -424,28 +424,28 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, ...@@ -424,28 +424,28 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo); tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo);
return (tsdbReadHandleT)pReadHandle; return (tsdbReaderT)pReadHandle;
_end: _end:
tsdbCleanupQueryHandle(pReadHandle); tsdbCleanupReadHandle(pReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL; return NULL;
} }
tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, uint64_t taskId) { tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, uint64_t taskId) {
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId); STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId);
if (pTsdbReadHandle == NULL) { if (pTsdbReadHandle == NULL) {
return NULL; return NULL;
} }
if (emptyQueryTimewindow(pTsdbReadHandle)) { if (emptyQueryTimewindow(pTsdbReadHandle)) {
return (tsdbReadHandleT*) pTsdbReadHandle; return (tsdbReaderT*) pTsdbReadHandle;
} }
// todo apply the lastkey of table check to avoid to load header file // todo apply the lastkey of table check to avoid to load header file
pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList); pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList);
if (pTsdbReadHandle->pTableCheckInfo == NULL) { if (pTsdbReadHandle->pTableCheckInfo == NULL) {
// tsdbCleanupQueryHandle(pTsdbReadHandle); // tsdbCleanupReadHandle(pTsdbReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -453,10 +453,10 @@ tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroup ...@@ -453,10 +453,10 @@ tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroup
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %"PRIzu" %s", pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %"PRIzu" %s", pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo),
taosArrayGetSize(groupList->pGroupList), pTsdbReadHandle->idStr); taosArrayGetSize(groupList->pGroupList), pTsdbReadHandle->idStr);
return (tsdbReadHandleT) pTsdbReadHandle; return (tsdbReaderT) pTsdbReadHandle;
} }
void tsdbResetQueryHandle(tsdbReadHandleT queryHandle, STsdbQueryCond *pCond) { void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond *pCond) {
STsdbReadHandle* pTsdbReadHandle = queryHandle; STsdbReadHandle* pTsdbReadHandle = queryHandle;
if (emptyQueryTimewindow(pTsdbReadHandle)) { if (emptyQueryTimewindow(pTsdbReadHandle)) {
...@@ -493,7 +493,7 @@ void tsdbResetQueryHandle(tsdbReadHandleT queryHandle, STsdbQueryCond *pCond) { ...@@ -493,7 +493,7 @@ void tsdbResetQueryHandle(tsdbReadHandleT queryHandle, STsdbQueryCond *pCond) {
resetCheckInfo(pTsdbReadHandle); resetCheckInfo(pTsdbReadHandle);
} }
void tsdbResetQueryHandleForNewTable(tsdbReadHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList) { void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList) {
STsdbReadHandle* pTsdbReadHandle = queryHandle; STsdbReadHandle* pTsdbReadHandle = queryHandle;
pTsdbReadHandle->order = pCond->order; pTsdbReadHandle->order = pCond->order;
...@@ -525,7 +525,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReadHandleT queryHandle, STsdbQueryCond ...@@ -525,7 +525,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReadHandleT queryHandle, STsdbQueryCond
pTsdbReadHandle->pTableCheckInfo = NULL;//createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta, &pTable); pTsdbReadHandle->pTableCheckInfo = NULL;//createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta, &pTable);
if (pTsdbReadHandle->pTableCheckInfo == NULL) { if (pTsdbReadHandle->pTableCheckInfo == NULL) {
// tsdbCleanupQueryHandle(pTsdbReadHandle); // tsdbCleanupReadHandle(pTsdbReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
} }
...@@ -533,7 +533,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReadHandleT queryHandle, STsdbQueryCond ...@@ -533,7 +533,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReadHandleT queryHandle, STsdbQueryCond
// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); // pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
} }
tsdbReadHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) { tsdbReaderT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) {
pCond->twindow = updateLastrowForEachGroup(groupList); pCond->twindow = updateLastrowForEachGroup(groupList);
// no qualified table // no qualified table
...@@ -561,7 +561,7 @@ tsdbReadHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroup ...@@ -561,7 +561,7 @@ tsdbReadHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroup
} }
#if 0 #if 0
tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) { tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef); STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
if (pTsdbReadHandle == NULL) { if (pTsdbReadHandle == NULL) {
return NULL; return NULL;
...@@ -581,7 +581,7 @@ tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGro ...@@ -581,7 +581,7 @@ tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGro
} }
#endif #endif
SArray* tsdbGetQueriedTableList(tsdbReadHandleT *pHandle) { SArray* tsdbGetQueriedTableList(tsdbReaderT *pHandle) {
assert(pHandle != NULL); assert(pHandle != NULL);
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) pHandle; STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) pHandle;
...@@ -624,7 +624,7 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr ...@@ -624,7 +624,7 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr
return pNew; return pNew;
} }
tsdbReadHandleT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) { tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) {
STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList); STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);
if (pNew->numOfTables == 0) { if (pNew->numOfTables == 0) {
...@@ -2338,7 +2338,7 @@ static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) { ...@@ -2338,7 +2338,7 @@ static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
cur->blockCompleted = false; cur->blockCompleted = false;
} }
#if 0 #if 0
int32_t tsdbGetFileBlocksDistInfo(tsdbReadHandleT* queryHandle, STableBlockDist* pTableBlockInfo) { int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTableBlockInfo) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) queryHandle; STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) queryHandle;
pTableBlockInfo->totalSize = 0; pTableBlockInfo->totalSize = 0;
...@@ -2494,7 +2494,7 @@ static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) { ...@@ -2494,7 +2494,7 @@ static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) {
} }
//todo not unref yet, since it is not support multi-group interpolation query //todo not unref yet, since it is not support multi-group interpolation query
static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReadHandleT pHandle) { static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) {
// filter the queried time stamp in the first place // filter the queried time stamp in the first place
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle; STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
...@@ -2635,7 +2635,7 @@ static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) { ...@@ -2635,7 +2635,7 @@ static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) {
} }
if (exists) { if (exists) {
tsdbRetrieveDataBlock((tsdbReadHandleT*) pTsdbReadHandle, NULL); tsdbRetrieveDataBlock((tsdbReaderT*) pTsdbReadHandle, NULL);
if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) { if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0); SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0);
assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey); assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey);
...@@ -2891,7 +2891,7 @@ static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) { ...@@ -2891,7 +2891,7 @@ static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
} }
// handle data in cache situation // handle data in cache situation
bool tsdbNextDataBlock(tsdbReadHandleT pHandle) { bool tsdbNextDataBlock(tsdbReaderT pHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle; STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
if (emptyQueryTimewindow(pTsdbReadHandle)) { if (emptyQueryTimewindow(pTsdbReadHandle)) {
...@@ -3051,11 +3051,11 @@ bool tsdbNextDataBlock(tsdbReadHandleT pHandle) { ...@@ -3051,11 +3051,11 @@ bool tsdbNextDataBlock(tsdbReadHandleT pHandle) {
// } // }
// //
//out_of_memory: //out_of_memory:
// tsdbCleanupQueryHandle(pSecQueryHandle); // tsdbCleanupReadHandle(pSecQueryHandle);
// return terrno; // return terrno;
//} //}
bool tsdbGetExternalRow(tsdbReadHandleT pHandle) { bool tsdbGetExternalRow(tsdbReaderT pHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle; STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
SQueryFilePos* cur = &pTsdbReadHandle->cur; SQueryFilePos* cur = &pTsdbReadHandle->cur;
...@@ -3112,7 +3112,7 @@ bool tsdbGetExternalRow(tsdbReadHandleT pHandle) { ...@@ -3112,7 +3112,7 @@ bool tsdbGetExternalRow(tsdbReadHandleT pHandle) {
// return code; // return code;
//} //}
bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle) { bool isTsdbCacheLastRow(tsdbReaderT* pTsdbReadHandle) {
return ((STsdbReadHandle *)pTsdbReadHandle)->cachelastrow > TSDB_CACHED_TYPE_NONE; return ((STsdbReadHandle *)pTsdbReadHandle)->cachelastrow > TSDB_CACHED_TYPE_NONE;
} }
...@@ -3230,7 +3230,7 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) { ...@@ -3230,7 +3230,7 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
return window; return window;
} }
void tsdbRetrieveDataBlockInfo(tsdbReadHandleT* pTsdbReadHandle, SDataBlockInfo* pDataBlockInfo) { void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDataBlockInfo) {
STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle; STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
SQueryFilePos* cur = &pHandle->cur; SQueryFilePos* cur = &pHandle->cur;
...@@ -3254,7 +3254,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReadHandleT* pTsdbReadHandle, SDataBlockInfo* ...@@ -3254,7 +3254,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReadHandleT* pTsdbReadHandle, SDataBlockInfo*
/* /*
* return null for mixed data block, if not a complete file data block, the statistics value will always return NULL * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
*/ */
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReadHandleT* pTsdbReadHandle, SDataStatis** pBlockStatis) { int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStatis** pBlockStatis) {
STsdbReadHandle* pHandle = (STsdbReadHandle*) pTsdbReadHandle; STsdbReadHandle* pHandle = (STsdbReadHandle*) pTsdbReadHandle;
SQueryFilePos* c = &pHandle->cur; SQueryFilePos* c = &pHandle->cur;
...@@ -3309,7 +3309,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReadHandleT* pTsdbReadHandle, SDataS ...@@ -3309,7 +3309,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReadHandleT* pTsdbReadHandle, SDataS
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SArray* tsdbRetrieveDataBlock(tsdbReadHandleT* pTsdbReadHandle, SArray* pIdList) { SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
/** /**
* In the following two cases, the data has been loaded to SColumnInfoData. * In the following two cases, the data has been loaded to SColumnInfoData.
* 1. data is from cache, 2. data block is not completed qualified to query time range * 1. data is from cache, 2. data block is not completed qualified to query time range
...@@ -3635,29 +3635,29 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd ...@@ -3635,29 +3635,29 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd
// return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
//} //}
int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len, int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo, int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) { SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
STbCfg* pTbCfg = metaGetTbInfoByUid(tsdb->pMeta, uid); STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid);
if (pTbCfg == NULL) { if (pTbCfg == NULL) {
tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId); // tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
goto _error; goto _error;
} }
if (pTbCfg->type != META_SUPER_TABLE) { if (pTbCfg->type != META_SUPER_TABLE) {
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId); // tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId);
terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
goto _error; goto _error;
} }
//NOTE: not add ref count for super table //NOTE: not add ref count for super table
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
SSchemaWrapper* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true); SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 0, true);
// no tags and tbname condition, all child tables of this stable are involved // no tags and tbname condition, all child tables of this stable are involved
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) { if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
int32_t ret = getAllTableList(tsdb->pMeta, uid, res); int32_t ret = getAllTableList(pMeta, uid, res);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -3665,8 +3665,8 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch ...@@ -3665,8 +3665,8 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res); pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, // tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb,
pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId); // pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
taosArrayDestroy(res); taosArrayDestroy(res);
return ret; return ret;
...@@ -3722,26 +3722,19 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch ...@@ -3722,26 +3722,19 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
return terrno; return terrno;
} }
#if 0 int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
int32_t tsdbGetOneTableGroup(STsdb* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) { STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid);
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error; if (pTbCfg == NULL) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
tsdbUnlockRepoMeta(tsdb);
goto _error; goto _error;
} }
assert(pTable->type == TSDB_CHILD_TABLE || pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_STREAM_TABLE);
if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
pGroupInfo->numOfTables = 1; pGroupInfo->numOfTables = 1;
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo)); SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
STableKeyInfo info = {.pTable = pTable, .lastKey = startKey}; STableKeyInfo info = {.lastKey = startKey, .uid = uid};
taosArrayPush(group, &info); taosArrayPush(group, &info);
taosArrayPush(pGroupInfo->pGroupList, &group); taosArrayPush(pGroupInfo->pGroupList, &group);
...@@ -3751,6 +3744,7 @@ int32_t tsdbGetOneTableGroup(STsdb* tsdb, uint64_t uid, TSKEY startKey, STableGr ...@@ -3751,6 +3744,7 @@ int32_t tsdbGetOneTableGroup(STsdb* tsdb, uint64_t uid, TSKEY startKey, STableGr
return terrno; return terrno;
} }
#if 0
int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) { int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) {
if (tsdbRLockRepoMeta(tsdb) < 0) { if (tsdbRLockRepoMeta(tsdb) < 0) {
return terrno; return terrno;
...@@ -3826,7 +3820,7 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) { ...@@ -3826,7 +3820,7 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
} }
void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) { void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle; STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
if (pTsdbReadHandle == NULL) { if (pTsdbReadHandle == NULL) {
return; return;
......
...@@ -26,12 +26,14 @@ int vnodeQueryOpen(SVnode *pVnode) { ...@@ -26,12 +26,14 @@ int vnodeQueryOpen(SVnode *pVnode) {
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in query queue is processing"); vTrace("message in query queue is processing");
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta};
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_QUERY: case TDMT_VND_QUERY:{
return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg); return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg);
}
case TDMT_VND_QUERY_CONTINUE: case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg); return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg);
default: default:
vError("unknown msg type:%d in query queue", pMsg->msgType); vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;
......
...@@ -224,12 +224,12 @@ typedef struct STaskAttr { ...@@ -224,12 +224,12 @@ typedef struct STaskAttr {
// SFilterInfo *pFilters; // SFilterInfo *pFilters;
void* tsdb; void* tsdb;
// SMemRef memRef;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo> STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
int32_t vgId; int32_t vgId;
SArray *pUdfInfo; // no need to free SArray *pUdfInfo; // no need to free
} STaskAttr; } STaskAttr;
typedef int32_t (*__optr_prepare_fn_t)(void* param);
typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup); typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup);
typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num); typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num);
...@@ -313,8 +313,9 @@ typedef struct SOperatorInfo { ...@@ -313,8 +313,9 @@ typedef struct SOperatorInfo {
struct SOperatorInfo **pDownstream; // downstram pointer list struct SOperatorInfo **pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_prepare_fn_t prepareFn;
__operator_fn_t exec; __operator_fn_t exec;
__optr_cleanup_fn_t cleanup; __optr_cleanup_fn_t cleanupFn;
} SOperatorInfo; } SOperatorInfo;
enum { enum {
...@@ -395,7 +396,7 @@ typedef struct STableScanInfo { ...@@ -395,7 +396,7 @@ typedef struct STableScanInfo {
int32_t current; int32_t current;
int32_t reverseTimes; // 0 by default int32_t reverseTimes; // 0 by default
SQLFunctionCtx *pCtx; // next operator query context SqlFunctionCtx *pCtx; // next operator query context
SResultRowInfo *pResultRowInfo; SResultRowInfo *pResultRowInfo;
int32_t *rowCellInfoOffset; int32_t *rowCellInfoOffset;
SExprInfo *pExpr; SExprInfo *pExpr;
...@@ -425,7 +426,7 @@ typedef struct SStreamBlockScanInfo { ...@@ -425,7 +426,7 @@ typedef struct SStreamBlockScanInfo {
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo; SResultRowInfo resultRowInfo;
int32_t *rowCellInfoOffset; // offset value for each row result cell info int32_t *rowCellInfoOffset; // offset value for each row result cell info
SQLFunctionCtx *pCtx; SqlFunctionCtx *pCtx;
SSDataBlock *pRes; SSDataBlock *pRes;
} SOptrBasicInfo; } SOptrBasicInfo;
...@@ -564,7 +565,6 @@ typedef struct SOrderOperatorInfo { ...@@ -564,7 +565,6 @@ typedef struct SOrderOperatorInfo {
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createSubmitBlockScanOperatorInfo(void *pSubmitBlockReadHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSubmitBlockScanOperatorInfo(void *pSubmitBlockReadHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo);
...@@ -607,11 +607,11 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO ...@@ -607,11 +607,11 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
void* destroyOutputBuf(SSDataBlock* pBlock); void* destroyOutputBuf(SSDataBlock* pBlock);
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols); void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows); void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
void freeParam(STaskParam *param); void freeParam(STaskParam *param);
int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo,
...@@ -659,8 +659,8 @@ void freeQueryAttr(STaskAttr *pQuery); ...@@ -659,8 +659,8 @@ void freeQueryAttr(STaskAttr *pQuery);
int32_t getMaximumIdleDurationSec(); int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx *pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle, uint64_t taskId); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId);
#endif // TDENGINE_EXECUTORIMPL_H #endif // TDENGINE_EXECUTORIMPL_H
...@@ -39,7 +39,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) ...@@ -39,7 +39,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id)
} }
} }
int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
...@@ -50,7 +50,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) { ...@@ -50,7 +50,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input, GET_TASKID(pTaskInfo)); int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*) input, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else { } else {
......
...@@ -51,25 +51,7 @@ static void freeqinfoFn(void *qhandle) { ...@@ -51,25 +51,7 @@ static void freeqinfoFn(void *qhandle) {
qDestroyTask(*handle); qDestroyTask(*handle);
} }
void freeParam(STaskParam *param) { int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
tfree(param->sql);
tfree(param->tagCond);
tfree(param->tbnameCond);
tfree(param->pTableIdList);
taosArrayDestroy(param->pOperator);
tfree(param->pExprs);
tfree(param->pSecExprs);
tfree(param->pExpr);
tfree(param->pSecExpr);
tfree(param->pGroupColIndex);
tfree(param->pTagColumnInfo);
tfree(param->pGroupbyExpr);
tfree(param->prevResult);
}
int32_t qCreateExecTask(void* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
assert(readHandle != NULL && pSubplan != NULL); assert(readHandle != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
......
...@@ -219,7 +219,7 @@ TEST(testCase, build_executor_tree_Test) { ...@@ -219,7 +219,7 @@ TEST(testCase, build_executor_tree_Test) {
SExecTaskInfo* pTaskInfo = nullptr; SExecTaskInfo* pTaskInfo = nullptr;
DataSinkHandle sinkHandle = nullptr; DataSinkHandle sinkHandle = nullptr;
int32_t code = qCreateExecTask((void*) 1, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle); int32_t code = qCreateExecTask((SReadHandle*) 1, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle);
} }
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
\ No newline at end of file
...@@ -73,7 +73,7 @@ typedef struct STwaInfo { ...@@ -73,7 +73,7 @@ typedef struct STwaInfo {
extern int32_t functionCompatList[]; // compatible check array list extern int32_t functionCompatList[]; // compatible check array list
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const char *maxval); bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const char *maxval);
/** /**
* the numOfRes should be kept, since it may be used later * the numOfRes should be kept, since it may be used later
......
...@@ -121,7 +121,7 @@ void destroyUdfInfo(SUdfInfo* pUdfInfo) { ...@@ -121,7 +121,7 @@ void destroyUdfInfo(SUdfInfo* pUdfInfo) {
tfree(pUdfInfo); tfree(pUdfInfo);
} }
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type) { void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx *pCtx, int32_t idx, int32_t type) {
int32_t output = 0; int32_t output = 0;
if (pUdfInfo == NULL || pUdfInfo->funcs[type] == NULL) { if (pUdfInfo == NULL || pUdfInfo->funcs[type] == NULL) {
......
...@@ -216,7 +216,8 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable ...@@ -216,7 +216,8 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable
} else if (needSeqScan(pPlanNode)) { } else if (needSeqScan(pPlanNode)) {
return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan); return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
} }
return createUserTableScanNode(pPlanNode, pTable, OP_DataBlocksOptScan); int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_DataBlocksOptScan:OP_StreamScan;
return createUserTableScanNode(pPlanNode, pTable, type);
} }
static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
...@@ -287,7 +288,7 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) { ...@@ -287,7 +288,7 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo, SSubplan* subplan) { static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo, SSubplan* subplan) {
SVgroupsInfo* pVgroupsInfo = pTableInfo->pMeta->vgroupList; SVgroupsInfo* pVgroupsInfo = pTableInfo->pMeta->vgroupList;
vgroupInfoToNodeAddr(&(pVgroupsInfo->vgroups[0]), &subplan->execNode); vgroupInfoToNodeAddr(&(pVgroupsInfo->vgroups[0]), &subplan->execNode);
int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan; int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_DataBlocksOptScan:OP_StreamScan;
return createUserTableScanNode(pPlanNode, pTableInfo, type); return createUserTableScanNode(pPlanNode, pTableInfo, type);
} }
...@@ -296,6 +297,7 @@ static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNo ...@@ -296,6 +297,7 @@ static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNo
if (needMultiNodeScan(pTable)) { if (needMultiNodeScan(pTable)) {
return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable)); return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
} }
return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan); return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
} }
...@@ -386,6 +388,33 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { ...@@ -386,6 +388,33 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
// todo deal subquery // todo deal subquery
} }
static void postCreateDag(SQueryPlanNode* pQueryNode, SQueryDag* pDag, SArray* pNodeList) {
// The exchange operator is not necessary, in case of the stream scan.
// Here we need to remove it from the DAG.
if (pQueryNode->info.type == QNODE_STREAMSCAN) {
SArray* pRootLevel = taosArrayGetP(pDag->pSubplans, 0);
SSubplan *pSubplan = taosArrayGetP(pRootLevel, 0);
if (pSubplan->pNode->info.type == OP_Exchange) {
ASSERT(taosArrayGetSize(pRootLevel) == 1);
taosArrayRemove(pDag->pSubplans, 0);
// And then update the number of the subplans.
pDag->numOfSubplans -= 1;
}
} else {
// Traverse the dag again to acquire the execution node.
if (pNodeList != NULL) {
SArray** pSubLevel = taosArrayGetLast(pDag->pSubplans);
size_t num = taosArrayGetSize(*pSubLevel);
for (int32_t j = 0; j < num; ++j) {
SSubplan* pPlan = taosArrayGetP(*pSubLevel, j);
taosArrayPush(pNodeList, &pPlan->execNode);
}
}
}
}
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId) { int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId) {
TRY(TSDB_MAX_TAG_CONDITIONS) { TRY(TSDB_MAX_TAG_CONDITIONS) {
SPlanContext context = { SPlanContext context = {
...@@ -407,16 +436,7 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD ...@@ -407,16 +436,7 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} END_TRY } END_TRY
// traverse the dag again to acquire the execution node. postCreateDag(pQueryNode, *pDag, pNodeList);
if (pNodeList != NULL) {
SArray** pSubLevel = taosArrayGetLast((*pDag)->pSubplans);
size_t num = taosArrayGetSize(*pSubLevel);
for (int32_t j = 0; j < num; ++j) {
SSubplan* pPlan = taosArrayGetP(*pSubLevel, j);
taosArrayPush(pNodeList, &pPlan->execNode);
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -88,7 +88,6 @@ static const char* jkPnodeType = "Type"; ...@@ -88,7 +88,6 @@ static const char* jkPnodeType = "Type";
static int32_t getPnodeTypeSize(cJSON* json) { static int32_t getPnodeTypeSize(cJSON* json) {
switch (getNumber(json, jkPnodeType)) { switch (getNumber(json, jkPnodeType)) {
case OP_StreamScan: case OP_StreamScan:
case OP_TableScan:
case OP_DataBlocksOptScan: case OP_DataBlocksOptScan:
case OP_TableSeqScan: case OP_TableSeqScan:
return sizeof(STableScanPhyNode); return sizeof(STableScanPhyNode);
...@@ -831,7 +830,6 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) { ...@@ -831,7 +830,6 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
const SPhyNode* phyNode = (const SPhyNode*)obj; const SPhyNode* phyNode = (const SPhyNode*)obj;
switch (phyNode->info.type) { switch (phyNode->info.type) {
case OP_StreamScan: case OP_StreamScan:
case OP_TableScan:
case OP_DataBlocksOptScan: case OP_DataBlocksOptScan:
case OP_TableSeqScan: case OP_TableSeqScan:
return tableScanNodeToJson(obj, json); return tableScanNodeToJson(obj, json);
...@@ -869,7 +867,6 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) { ...@@ -869,7 +867,6 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
static bool specificPhyNodeFromJson(const cJSON* json, void* obj) { static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
SPhyNode* phyNode = (SPhyNode*)obj; SPhyNode* phyNode = (SPhyNode*)obj;
switch (phyNode->info.type) { switch (phyNode->info.type) {
case OP_TableScan:
case OP_StreamScan: case OP_StreamScan:
case OP_DataBlocksOptScan: case OP_DataBlocksOptScan:
case OP_TableSeqScan: case OP_TableSeqScan:
......
...@@ -254,6 +254,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retriev ...@@ -254,6 +254,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retriev
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists")
// mnode-topic
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet")
// dnode // dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_OFFLINE, "Dnode is offline") TAOS_DEFINE_ERROR(TSDB_CODE_DND_OFFLINE, "Dnode is offline")
......
Subproject commit 4a4d79099b076b8ff12d5b4fdbcba54049a6866d
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册