提交 d12532b4 编写于 作者: D dapan1121

feature/qnodew

上级 84e89395
......@@ -101,6 +101,7 @@ void qmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QNODE_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, qmProcessFetchMsg, QNODE_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, qmProcessFetchMsg, QNODE_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, qmProcessFetchMsg, QNODE_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, qmProcessFetchMsg, QNODE_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, QNODE_HANDLE);
......
......@@ -84,6 +84,8 @@ int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) {
// return vnodeGetTableMeta(pQnode, pMsg);
case TDMT_VND_CONSUME:
// return tqProcessConsumeReq(pQnode->pTq, pMsg);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg);
default:
qError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
......
......@@ -398,7 +398,7 @@ void bpAppendOperatorParam(BindData *data, int32_t *len, int32_t dataType, int32
}
}
void generateQuerySQL(BindData *data, int32_t tblIdx) {
void generateQueryCondSQL(BindData *data, int32_t tblIdx) {
int32_t len = sprintf(data->sql, "select * from %s%d where ", bpTbPrefix, tblIdx);
if (!gCurCase->fullCol) {
for (int c = 0; c < gCurCase->bindColNum; ++c) {
......@@ -462,6 +462,72 @@ void generateQuerySQL(BindData *data, int32_t tblIdx) {
}
}
void generateQueryMiscSQL(BindData *data, int32_t tblIdx) {
int32_t len = sprintf(data->sql, "select * from %s%d where ", bpTbPrefix, tblIdx);
if (!gCurCase->fullCol) {
for (int c = 0; c < gCurCase->bindColNum; ++c) {
if (c) {
len += sprintf(data->sql + len, " and ");
}
switch (data->pBind[c].buffer_type) {
case TSDB_DATA_TYPE_BOOL:
len += sprintf(data->sql + len, "booldata");
break;
case TSDB_DATA_TYPE_TINYINT:
len += sprintf(data->sql + len, "tinydata");
break;
case TSDB_DATA_TYPE_SMALLINT:
len += sprintf(data->sql + len, "smalldata");
break;
case TSDB_DATA_TYPE_INT:
len += sprintf(data->sql + len, "intdata");
break;
case TSDB_DATA_TYPE_BIGINT:
len += sprintf(data->sql + len, "bigdata");
break;
case TSDB_DATA_TYPE_FLOAT:
len += sprintf(data->sql + len, "floatdata");
break;
case TSDB_DATA_TYPE_DOUBLE:
len += sprintf(data->sql + len, "doubledata");
break;
case TSDB_DATA_TYPE_VARCHAR:
len += sprintf(data->sql + len, "binarydata");
break;
case TSDB_DATA_TYPE_TIMESTAMP:
len += sprintf(data->sql + len, "ts");
break;
case TSDB_DATA_TYPE_NCHAR:
len += sprintf(data->sql + len, "nchardata");
break;
case TSDB_DATA_TYPE_UTINYINT:
len += sprintf(data->sql + len, "utinydata");
break;
case TSDB_DATA_TYPE_USMALLINT:
len += sprintf(data->sql + len, "usmalldata");
break;
case TSDB_DATA_TYPE_UINT:
len += sprintf(data->sql + len, "uintdata");
break;
case TSDB_DATA_TYPE_UBIGINT:
len += sprintf(data->sql + len, "ubigdata");
break;
default:
printf("!!!invalid col type:%d", data->pBind[c].buffer_type);
exit(1);
}
bpAppendOperatorParam(data, &len, data->pBind[c].buffer_type, c);
}
}
if (gCaseCtrl.printStmtSql) {
printf("\tSTMT SQL: %s\n", data->sql);
}
}
void generateErrorSQL(BindData *data, int32_t tblIdx) {
int32_t len = 0;
data->sql = taosMemoryCalloc(1, 1024);
......@@ -677,7 +743,7 @@ int32_t prepareInsertData(BindData *data) {
return 0;
}
int32_t prepareQueryData(BindData *data, int32_t tblIdx) {
int32_t prepareQueryCondData(BindData *data, int32_t tblIdx) {
static int64_t tsData = 1591060628000;
uint64_t bindNum = gCurCase->rowNum / gCurCase->bindRowNum;
......@@ -735,6 +801,63 @@ int32_t prepareQueryData(BindData *data, int32_t tblIdx) {
}
int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) {
static int64_t tsData = 1591060628000;
uint64_t bindNum = gCurCase->rowNum / gCurCase->bindRowNum;
data->colNum = 0;
data->colTypes = taosMemoryCalloc(30, sizeof(int32_t));
data->sql = taosMemoryCalloc(1, 1024);
data->pBind = taosMemoryCalloc(bindNum*gCurCase->bindColNum, sizeof(TAOS_MULTI_BIND));
data->tsData = taosMemoryMalloc(bindNum * sizeof(int64_t));
data->boolData = taosMemoryMalloc(bindNum * sizeof(bool));
data->tinyData = taosMemoryMalloc(bindNum * sizeof(int8_t));
data->utinyData = taosMemoryMalloc(bindNum * sizeof(uint8_t));
data->smallData = taosMemoryMalloc(bindNum * sizeof(int16_t));
data->usmallData = taosMemoryMalloc(bindNum * sizeof(uint16_t));
data->intData = taosMemoryMalloc(bindNum * sizeof(int32_t));
data->uintData = taosMemoryMalloc(bindNum * sizeof(uint32_t));
data->bigData = taosMemoryMalloc(bindNum * sizeof(int64_t));
data->ubigData = taosMemoryMalloc(bindNum * sizeof(uint64_t));
data->floatData = taosMemoryMalloc(bindNum * sizeof(float));
data->doubleData = taosMemoryMalloc(bindNum * sizeof(double));
data->binaryData = taosMemoryMalloc(bindNum * gVarCharSize);
data->binaryLen = taosMemoryMalloc(bindNum * sizeof(int32_t));
if (gCurCase->bindNullNum) {
data->isNull = taosMemoryCalloc(bindNum, sizeof(char));
}
for (int32_t i = 0; i < bindNum; ++i) {
data->tsData[i] = tsData + tblIdx*gCurCase->rowNum + rand()%gCurCase->rowNum;
data->boolData[i] = (bool)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum);
data->tinyData[i] = (int8_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum);
data->utinyData[i] = (uint8_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum);
data->smallData[i] = (int16_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum);
data->usmallData[i] = (uint16_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum);
data->intData[i] = (int32_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum);
data->uintData[i] = (uint32_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum);
data->bigData[i] = (int64_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum);
data->ubigData[i] = (uint64_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum);
data->floatData[i] = (float)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum);
data->doubleData[i] = (double)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum);
memset(data->binaryData + gVarCharSize * i, 'a'+i%26, gVarCharLen);
if (gCurCase->bindNullNum) {
data->isNull[i] = i % 2;
}
data->binaryLen[i] = gVarCharLen;
}
for (int b = 0; b < bindNum; b++) {
for (int c = 0; c < gCurCase->bindColNum; ++c) {
prepareColData(data, b*gCurCase->bindColNum+c, b*gCurCase->bindRowNum, c);
}
}
generateQueryMiscSQL(data, tblIdx);
return 0;
}
void destroyData(BindData *data) {
......@@ -1385,7 +1508,7 @@ int querySUBTTest1(TAOS_STMT *stmt, TAOS *taos) {
for (int32_t t = 0; t< gCurCase->tblNum; ++t) {
memset(&data, 0, sizeof(data));
prepareQueryData(&data, t);
prepareQueryCondData(&data, t);
int code = taos_stmt_prepare(stmt, data.sql, 0);
if (code != 0){
......@@ -1431,7 +1554,7 @@ int querySUBTTest2(TAOS_STMT *stmt, TAOS *taos) {
for (int32_t t = 0; t< gCurCase->tblNum; ++t) {
memset(&data, 0, sizeof(data));
prepareQueryData(&data, t);
prepareQueryMiscData(&data, t);
int code = taos_stmt_prepare(stmt, data.sql, 0);
if (code != 0){
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册