提交 9da06b52 编写于 作者: H Haojun Liao

[td-11818] merge 3.0

......@@ -875,6 +875,7 @@ typedef struct SSubQueryMsg {
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
int8_t taskType;
uint32_t contentLen;
char msg[];
} SSubQueryMsg;
......@@ -1524,9 +1525,23 @@ typedef struct SMqSetCVgReq {
char* sql;
char* logicalPlan;
char* physicalPlan;
SArray* tasks; // SArray<SSubQueryMsg>
SSubQueryMsg msg;
} SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) {
int32_t tlen = sizeof(SSubQueryMsg) + pMsg->contentLen;
if (buf == NULL) return tlen;
memcpy(*buf, pMsg, tlen);
*buf = POINTER_SHIFT(*buf, tlen);
return tlen;
}
static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
int32_t tlen = sizeof(SSubQueryMsg) + ((SSubQueryMsg*)buf)->contentLen;
memcpy(pMsg, buf, tlen);
return POINTER_SHIFT(buf, tlen);
}
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pReq->vgId);
......@@ -1536,6 +1551,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen;
}
......@@ -1547,7 +1563,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan);
pReq->tasks = NULL;
buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf;
}
......
......@@ -84,6 +84,13 @@ void* qGetResultRetrieveMsg(qTaskInfo_t qinfo);
*/
int32_t qKillTask(qTaskInfo_t qinfo);
/**
* kill the ongoing query asynchronously
* @param qinfo qhandle
* @return
*/
int32_t qAsyncKillTask(qTaskInfo_t qinfo);
/**
* return whether query is completed or not
* @param qinfo
......
......@@ -38,6 +38,11 @@ enum {
JOB_TASK_STATUS_FREEING,
};
enum {
TASK_TYPE_PERSISTENT = 1,
TASK_TYPE_TEMP,
};
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision
......@@ -104,12 +109,71 @@ typedef struct STableMetaOutput {
STableMeta *tbMeta;
} STableMetaOutput;
const SSchema* tGetTbnameColumnSchema();
typedef struct SDataBuf {
void *pData;
uint32_t len;
} SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; //async callback function
void *param;
uint64_t requestId;
uint64_t requestObjRefId;
int32_t msgType;
SDataBuf msgInfo;
} SMsgSendInfo;
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
SEpAddr epAddr[TSDB_MAX_REPLICA];
} SQueryNodeAddr;
static FORCE_INLINE void tConvertQueryAddrToEpSet(SEpSet* pEpSet, const SQueryNodeAddr* pAddr) {
pEpSet->inUse = pAddr->inUse;
pEpSet->numOfEps = pAddr->numOfEps;
for (int j = 0; j < TSDB_MAX_REPLICA; j++) {
pEpSet->port[j] = pAddr->epAddr[j].port;
memcpy(pEpSet->fqdn[j], pAddr->epAddr[j].fqdn, TSDB_FQDN_LEN);
}
}
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
/**
*
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
*
* @param pTransporter
* @param epSet
* @param pTransporterId
* @param pInfo
* @return
*/
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
void initQueryModuleMsgHandle();
const SSchema* tGetTbnameColumnSchema();
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize);
#define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE
#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
......
......@@ -83,56 +83,6 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid);
typedef struct SDataBuf {
void *pData;
uint32_t len;
} SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; //async callback function
void *param;
uint64_t requestId;
uint64_t requestObjRefId;
int32_t msgType;
SDataBuf msgInfo;
} SMsgSendInfo;
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
SEpAddr epAddr[TSDB_MAX_REPLICA];
} SQueryNodeAddr;
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
/**
*
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
*
* @param pTransporter
* @param epSet
* @param pTransporterId
* @param pInfo
* @return
*/
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
#ifdef __cplusplus
}
#endif
......
......@@ -362,6 +362,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_DUPLICATTED_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0718) //"Duplicatted operation")
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) //"Task message error")
#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A) //"Job freed")
#define TSDB_CODE_QRY_TASK_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x071B) //"Task status error")
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired")
......
......@@ -327,11 +327,13 @@ typedef struct SMqTopicConsumer {
#endif
typedef struct SMqConsumerEp {
int32_t vgId; // -1 for unassigned
SEpSet epset;
int64_t consumerId; // -1 for unassigned
int64_t lastConsumerHbTs;
int64_t lastVgHbTs;
int32_t vgId; // -1 for unassigned
SEpSet epset;
int64_t consumerId; // -1 for unassigned
int64_t lastConsumerHbTs;
int64_t lastVgHbTs;
int32_t execLen;
SSubQueryMsg qExec;
} SMqConsumerEp;
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
......@@ -339,6 +341,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset);
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
return tlen;
}
......@@ -346,6 +349,8 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
buf = taosDecodeSEpSet(buf, &pConsumerEp->epset);
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen;
return buf;
}
......
......@@ -105,6 +105,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
strcpy(req.sql, pTopic->sql);
strcpy(req.logicalPlan, pTopic->logicalPlan);
strcpy(req.physicalPlan, pTopic->physicalPlan);
memcpy(&req.msg, &pCEp->qExec, pCEp->execLen);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *reqStr = malloc(tlen);
if (reqStr == NULL) {
......@@ -143,7 +144,21 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
SMqConsumerEp CEp;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
int32_t sz;
//convert phyplan to dag
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
SArray *pArray;
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
}
int32_t sz = taosArrayGetSize(pArray);
//convert dag to msg
for (int32_t i = 0; i < sz; i++) {
STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
int32_t vgId = pTaskInfo->addr.nodeId;
SEpSet epSet;
tConvertQueryAddrToEpSet(&epSet, &pTaskInfo->addr);
}
/*pTopic->physicalPlan;*/
SVgObj *pVgroup = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup);
......@@ -156,6 +171,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
}
return 0;
qDestroyQueryDag(pDag);
}
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
......
......@@ -682,7 +682,6 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
// TODO: handle error
}
ASSERT(taosArrayGetSize(pArray) == 0);
STaskInfo *pInfo = taosArrayGet(pArray, 0);
SArray* pTasks;
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
......@@ -734,6 +733,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
// TODO: filter out unused column
return 0;
}
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
int32_t sversion = pHandle->pBlock->sversion;
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
......@@ -763,10 +763,6 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
taosArrayPush(pArray, &colInfo);
return pArray;
}
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t
* status) {*/
/*return 0;*/
/*}*/
static qTaskInfo_t createExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) {
if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
......
......@@ -276,6 +276,19 @@ int32_t qKillTask(qTaskInfo_t qinfo) {
return TSDB_CODE_SUCCESS;
}
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qDebug("QInfo:0x%"PRIx64" query async killed", pQInfo->qId);
setQueryKilled(pQInfo);
return TSDB_CODE_SUCCESS;
}
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
......
......@@ -84,6 +84,7 @@ typedef struct SQWMsg {
typedef struct SQWPhaseInput {
int8_t status;
int8_t taskType;
int32_t code;
qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle;
......@@ -91,8 +92,7 @@ typedef struct SQWPhaseInput {
typedef struct SQWPhaseOutput {
int32_t rspCode;
bool needStop;
bool needRsp;
bool needStop;
} SQWPhaseOutput;
......@@ -104,10 +104,15 @@ typedef struct SQWTaskStatus {
typedef struct SQWTaskCtx {
SRWLatch lock;
int8_t phase;
int8_t taskType;
void *readyConnection;
void *dropConnection;
void *cancelConnection;
bool emptyRes;
int8_t queryContinue;
int8_t inQueue;
int8_t queryInQueue;
int32_t rspCode;
int8_t events[QW_EVENT_MAX];
......@@ -170,6 +175,10 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
......
......@@ -23,7 +23,7 @@ extern "C" {
#include "qworkerInt.h"
#include "dataSinkMgt.h"
int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg, int8_t taskType);
int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
......
此差异已折叠。
......@@ -290,7 +290,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_SCH_TASK_DLOG("processQuery start, node:%p", node);
QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg));
QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType));
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
......@@ -374,7 +374,9 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
msg->sId = htobe64(msg->sId);
uint64_t sId = msg->sId;
SSchedulerStatusRsp *sStatus = NULL;
......
......@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(qworkerTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
qworkerTest
PUBLIC os util common transport gtest qcom planner qworker
PUBLIC os util common transport gtest qcom planner qworker executor
)
TARGET_INCLUDE_DIRECTORIES(
......
......@@ -32,11 +32,75 @@
#include "qworker.h"
#include "stub.h"
#include "addr_any.h"
#include "executor.h"
#include "dataSinkMgt.h"
namespace {
bool testStop = false;
bool qwtTestEnableSleep = true;
bool qwtTestStop = false;
bool qwtTestDeadLoop = true;
int32_t qwtTestMTRunSec = 10;
int32_t qwtTestPrintNum = 100000;
int32_t qwtTestCaseIdx = 0;
int32_t qwtTestCaseNum = 4;
void qwtInitLogFile() {
const char *defaultLogFileNamePrefix = "taosdlog";
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
qDebugFlag = 159;
char temp[128] = {0};
sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix);
if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc->pCont = queryMsg;
queryRpc->contLen = sizeof(SSubQueryMsg) + 100;
}
void qwtBuildReadyReqMsg(SResReadyReq *readyMsg, SRpcMsg *readyRpc) {
readyMsg->sId = htobe64(1);
readyMsg->queryId = htobe64(1);
readyMsg->taskId = htobe64(1);
readyRpc->pCont = readyMsg;
readyRpc->contLen = sizeof(SResReadyReq);
}
void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
fetchMsg->sId = htobe64(1);
fetchMsg->queryId = htobe64(1);
fetchMsg->taskId = htobe64(1);
fetchRpc->pCont = fetchMsg;
fetchRpc->contLen = sizeof(SResFetchReq);
}
void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
dropMsg->sId = htobe64(1);
dropMsg->queryId = htobe64(1);
dropMsg->taskId = htobe64(1);
dropRpc->pCont = dropMsg;
dropRpc->contLen = sizeof(STaskDropReq);
}
void qwtBuildStatusReqMsg(SSchTasksStatusReq *statusMsg, SRpcMsg *statusRpc) {
statusMsg->sId = htobe64(1);
statusRpc->pCont = statusMsg;
statusRpc->contLen = sizeof(SSchTasksStatusReq);
statusRpc->msgType = TDMT_VND_TASKS_STATUS;
}
int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
return 0;
......@@ -48,6 +112,7 @@ int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) {
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
/*
if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) {
SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont;
printf("task num:%d\n", rsp->num);
......@@ -56,9 +121,63 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
printf("qId:%"PRIx64",tId:%"PRIx64",status:%d\n", task->queryId, task->taskId, task->status);
}
}
*/
return;
}
int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
int32_t idx = qwtTestCaseIdx % qwtTestCaseNum;
if (0 == idx) {
*pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx;
*handle = (DataSinkHandle)qwtTestCaseIdx+1;
} else if (1 == idx) {
*pTaskInfo = NULL;
*handle = NULL;
} else if (2 == idx) {
*pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx;
*handle = NULL;
} else if (3 == idx) {
*pTaskInfo = NULL;
*handle = (DataSinkHandle)qwtTestCaseIdx;
}
++qwtTestCaseIdx;
return 0;
}
int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
return 0;
}
int32_t qwtKillTask(qTaskInfo_t qinfo) {
return 0;
}
void qwtDestroyTask(qTaskInfo_t qHandle) {
}
int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) {
return 0;
}
void qwtEndPut(DataSinkHandle handle, uint64_t useconds) {
}
void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
}
int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
return 0;
}
void qwtDestroyDataSinker(DataSinkHandle handle) {
}
void stubSetStringToPlan() {
......@@ -74,11 +193,118 @@ void stubSetStringToPlan() {
}
}
void stubSetExecTask() {
static Stub stub;
stub.set(qExecTask, qwtExecTask);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^qExecTask$", result);
for (const auto& f : result) {
stub.set(f.second, qwtExecTask);
}
}
}
void stubSetCreateExecTask() {
static Stub stub;
stub.set(qCreateExecTask, qwtCreateExecTask);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^qCreateExecTask$", result);
for (const auto& f : result) {
stub.set(f.second, qwtCreateExecTask);
}
}
}
void stubSetAsyncKillTask() {
static Stub stub;
stub.set(qAsyncKillTask, qwtKillTask);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^qAsyncKillTask$", result);
for (const auto& f : result) {
stub.set(f.second, qwtKillTask);
}
}
}
void stubSetDestroyTask() {
static Stub stub;
stub.set(qDestroyTask, qwtDestroyTask);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^qDestroyTask$", result);
for (const auto& f : result) {
stub.set(f.second, qwtDestroyTask);
}
}
}
void stubSetDestroyDataSinker() {
static Stub stub;
stub.set(dsDestroyDataSinker, qwtDestroyDataSinker);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^dsDestroyDataSinker$", result);
for (const auto& f : result) {
stub.set(f.second, qwtDestroyDataSinker);
}
}
}
void stubSetGetDataLength() {
static Stub stub;
stub.set(dsGetDataLength, qwtGetDataLength);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^dsGetDataLength$", result);
for (const auto& f : result) {
stub.set(f.second, qwtGetDataLength);
}
}
}
void stubSetEndPut() {
static Stub stub;
stub.set(dsEndPut, qwtEndPut);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^dsEndPut$", result);
for (const auto& f : result) {
stub.set(f.second, qwtEndPut);
}
}
}
void stubSetPutDataBlock() {
static Stub stub;
stub.set(dsPutDataBlock, qwtPutDataBlock);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^dsPutDataBlock$", result);
for (const auto& f : result) {
stub.set(f.second, qwtPutDataBlock);
}
}
}
void stubSetRpcSendResponse() {
static Stub stub;
stub.set(rpcSendResponse, qwtRpcSendResponse);
{
AddrAny any("libplanner.so");
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendResponse$", result);
for (const auto& f : result) {
......@@ -87,24 +313,35 @@ void stubSetRpcSendResponse() {
}
}
void stubSetGetDataBlock() {
static Stub stub;
stub.set(dsGetDataBlock, qwtGetDataBlock);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^dsGetDataBlock$", result);
for (const auto& f : result) {
stub.set(f.second, qwtGetDataBlock);
}
}
}
void *queryThread(void *param) {
SRpcMsg queryRpc = {0};
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
while (!testStop) {
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
while (!qwtTestStop) {
qwtBuildQueryReqMsg(&queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
free(queryRpc.pCont);
if (qwtTestEnableSleep) {
usleep(rand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("query:%d\n", n);
}
}
......@@ -119,16 +356,14 @@ void *readyThread(void *param) {
void *mockPointer = (void *)0x1;
void *mgmt = param;
SResReadyReq readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyReq);
while (!testStop) {
while (!qwtTestStop) {
qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
if (qwtTestEnableSleep) {
usleep(rand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("ready:%d\n", n);
}
}
......@@ -143,16 +378,14 @@ void *fetchThread(void *param) {
void *mockPointer = (void *)0x1;
void *mgmt = param;
SResFetchReq fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchReq);
while (!testStop) {
while (!qwtTestStop) {
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
if (qwtTestEnableSleep) {
usleep(rand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("fetch:%d\n", n);
}
}
......@@ -167,16 +400,14 @@ void *dropThread(void *param) {
void *mockPointer = (void *)0x1;
void *mgmt = param;
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropReq);
while (!testStop) {
while (!qwtTestStop) {
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
if (qwtTestEnableSleep) {
usleep(rand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("drop:%d\n", n);
}
}
......@@ -191,16 +422,14 @@ void *statusThread(void *param) {
void *mockPointer = (void *)0x1;
void *mgmt = param;
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
while (!testStop) {
statusMsg.sId = htobe64(1);
while (!qwtTestStop) {
qwtBuildStatusReqMsg(&statusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
if (qwtTestEnableSleep) {
usleep(rand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("status:%d\n", n);
}
}
......@@ -209,6 +438,35 @@ void *statusThread(void *param) {
}
void *controlThread(void *param) {
SRpcMsg queryRpc = {0};
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
while (!qwtTestStop) {
qwtBuildQueryReqMsg(&queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
free(queryRpc.pCont);
if (qwtTestEnableSleep) {
usleep(rand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("query:%d\n", n);
}
}
return NULL;
}
void *queryQueueThread(void *param) {
}
void *fetchQueueThread(void *param) {
}
......@@ -224,6 +482,8 @@ TEST(seqTest, normalCase) {
SRpcMsg fetchRpc = {0};
SRpcMsg dropRpc = {0};
SRpcMsg statusRpc = {0};
qwtInitLogFile();
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
......@@ -262,6 +522,15 @@ TEST(seqTest, normalCase) {
stubSetStringToPlan();
stubSetRpcSendResponse();
stubSetExecTask();
stubSetCreateExecTask();
stubSetAsyncKillTask();
stubSetDestroyTask();
stubSetDestroyDataSinker();
stubSetGetDataLength();
stubSetEndPut();
stubSetPutDataBlock();
stubSetGetDataBlock();
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
......@@ -308,6 +577,8 @@ TEST(seqTest, cancelFirst) {
SRpcMsg queryRpc = {0};
SRpcMsg dropRpc = {0};
SRpcMsg statusRpc = {0};
qwtInitLogFile();
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
......@@ -348,7 +619,7 @@ TEST(seqTest, cancelFirst) {
ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
ASSERT_EQ(code, 0);
ASSERT_EQ(code, TSDB_CODE_QRY_TASK_DROPPED);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
......@@ -366,44 +637,16 @@ TEST(seqTest, randCase) {
SRpcMsg fetchRpc = {0};
SRpcMsg dropRpc = {0};
SRpcMsg statusRpc = {0};
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
SResReadyReq readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyReq);
SResFetchReq fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchReq);
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropReq);
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
qwtInitLogFile();
stubSetStringToPlan();
stubSetRpcSendResponse();
stubSetCreateExecTask();
srand(time(NULL));
......@@ -416,20 +659,25 @@ TEST(seqTest, randCase) {
int32_t r = rand() % maxr;
if (r >= 0 && r < maxr/5) {
printf("Query,%d\n", t++);
printf("Query,%d\n", t++);
qwtBuildQueryReqMsg(&queryRpc);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
free(queryRpc.pCont);
} else if (r >= maxr/5 && r < maxr * 2/5) {
printf("Ready,%d\n", t++);
qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
} else if (r >= maxr * 2/5 && r < maxr* 3/5) {
printf("Fetch,%d\n", t++);
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
} else if (r >= maxr * 3/5 && r < maxr * 4/5) {
printf("Drop,%d\n", t++);
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
} else if (r >= maxr * 4/5 && r < maxr-1) {
printf("Status,%d\n", t++);
statusMsg.sId = htobe64(1);
qwtBuildStatusReqMsg(&statusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
} else {
......@@ -445,6 +693,8 @@ TEST(seqTest, multithreadRand) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
qwtInitLogFile();
stubSetStringToPlan();
stubSetRpcSendResponse();
......@@ -464,15 +714,69 @@ TEST(seqTest, multithreadRand) {
pthread_create(&(t4), &thattr, dropThread, NULL);
pthread_create(&(t5), &thattr, statusThread, NULL);
int32_t t = 0;
int32_t maxr = 10001;
sleep(300);
testStop = true;
sleep(1);
while (true) {
if (qwtTestDeadLoop) {
sleep(1);
} else {
sleep(qwtTestMTRunSec);
break;
}
}
qwtTestStop = true;
sleep(3);
qWorkerDestroy(&mgmt);
}
TEST(rcTest, multithread) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
qwtInitLogFile();
stubSetStringToPlan();
stubSetRpcSendResponse();
stubSetExecTask();
stubSetCreateExecTask();
stubSetAsyncKillTask();
stubSetDestroyTask();
stubSetDestroyDataSinker();
stubSetGetDataLength();
stubSetEndPut();
stubSetPutDataBlock();
stubSetGetDataBlock();
srand(time(NULL));
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t t1,t2,t3,t4,t5;
pthread_create(&(t1), &thattr, controlThread, mgmt);
pthread_create(&(t2), &thattr, queryQueueThread, NULL);
pthread_create(&(t3), &thattr, fetchQueueThread, NULL);
while (true) {
if (qwtTestDeadLoop) {
sleep(1);
} else {
sleep(qwtTestMTRunSec);
break;
}
}
qwtTestStop = true;
sleep(3);
qWorkerDestroy(&mgmt);
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
......
......@@ -1102,6 +1102,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
pMsg->taskType = TASK_TYPE_TEMP;
pMsg->contentLen = htonl(pTask->msgLen);
memcpy(pMsg->msg, pTask->msg, pTask->msgLen);
break;
......@@ -1487,6 +1488,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(plan->id.queryId);
pMsg->taskId = htobe64(schGenUUID());
pMsg->taskType = TASK_TYPE_PERSISTENT;
pMsg->contentLen = htonl(msgLen);
memcpy(pMsg->msg, msg, msgLen);
......
......@@ -361,6 +361,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPING, "Task dropping")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUPLICATTED_OPERATION, "Duplicatted operation")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_MSG_ERROR, "Task message error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_FREED, "Job already freed")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_STATUS_ERROR, "Task status error")
......
......@@ -85,7 +85,7 @@ void createDbAndStb() {
}
taos_free_result(pRes);
sprintf(qstr, "create table if not exists %s (ts timestamp, i int) tags (j int)", stbName);
sprintf(qstr, "create table if not exists %s (ts timestamp, i int) tags (j bigint)", stbName);
pRes = taos_query(con, qstr);
code = taos_errno(pRes);
if (code != 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册