未验证 提交 8fa19459 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #9753 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
......@@ -91,8 +91,10 @@ typedef struct SPhyNode {
typedef struct SScanPhyNode {
SPhyNode node;
uint64_t uid; // unique id of the table
uint64_t uid; // unique id of the table
int8_t tableType;
int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
int32_t count; // repeat count
} SScanPhyNode;
typedef SScanPhyNode SSystemTableScanPhyNode;
......
......@@ -93,13 +93,12 @@ typedef struct SReqResultInfo {
const char *pData;
TAOS_FIELD *fields;
uint32_t numOfCols;
int32_t *length;
TAOS_ROW row;
char **pCol;
uint32_t numOfRows;
uint32_t current;
bool completed;
} SReqResultInfo;
typedef struct SShowReqInfo {
......
......@@ -25,8 +25,9 @@
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
static bool stringLengthCheck(const char* str, size_t maxsize) {
static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) {
return false;
}
......@@ -212,6 +213,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag)
SSubplan* pPlan = taosArrayGetP(pa, 0);
SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema);
setResSchemaInfo(pResInfo, pDataBlockSchema);
pRequest->type = TDMT_VND_QUERY;
}
......@@ -228,7 +230,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlo
SSchema* pSchema = &pDataBlockSchema->pSchema[i];
pResInfo->fields[i].bytes = pSchema->bytes;
pResInfo->fields[i].type = pSchema->type;
tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
tstrncpy(pResInfo->fields[i].name, pSchema->name, tListLen(pResInfo->fields[i].name));
}
}
......@@ -545,11 +547,15 @@ void* doFetchRow(SRequestObj* pRequest) {
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
if (pRequest->type == TDMT_VND_QUERY) {
pRequest->type = TDMT_VND_FETCH;
// All data has returned to App already, no need to try again
if (pResultInfo->completed) {
return NULL;
}
scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
pResultInfo->current = 0;
if (pResultInfo->numOfRows <= pResultInfo->current) {
setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData);
if (pResultInfo->numOfRows == 0) {
return NULL;
}
......@@ -611,12 +617,23 @@ _return:
return pResultInfo->row;
}
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
if (pResInfo->row == NULL) {
pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
}
}
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
if (numOfRows == 0) {
return;
}
// todo check for the failure of malloc
doPrepareResPtr(pResultInfo);
int32_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
pResultInfo->length[i] = pResultInfo->fields[i].bytes;
......@@ -642,3 +659,14 @@ void setConnectionDB(STscObj* pTscObj, const char* db) {
pthread_mutex_unlock(&pTscObj->mutex);
}
void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
assert(pResultInfo != NULL && pRsp != NULL);
pResultInfo->pRspMsg = (const char*) pRsp;
pResultInfo->pData = (void*) pRsp->data;
pResultInfo->numOfRows = htonl(pRsp->numOfRows);
pResultInfo->current = 0;
pResultInfo->completed = (pRsp->completed == 1);
setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
}
\ No newline at end of file
......@@ -154,9 +154,6 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pResInfo->fields = pFields;
pResInfo->numOfCols = pMetaMsg->numOfColumns;
pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
pRequest->body.showInfo.execId = pShow->showId;
......
......@@ -18,7 +18,6 @@
#include "ttime.h"
#include "exception.h"
#include "../../../../contrib/cJson/cJSON.h"
#include "executorimpl.h"
#include "function.h"
#include "tcompare.h"
......@@ -7188,31 +7187,47 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) {
SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, void* param) {
if (pPhyNode->pChildren == NULL || taosArrayGetSize(pPhyNode->pChildren) == 0) {
if (pPhyNode->info.type == OP_TableScan) {
SScanPhyNode* pScanPhyNode = (SScanPhyNode*) pPhyNode;
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
SOperatorInfo* pOperatorInfo = createTableScanOperator(param, TSDB_ORDER_ASC, numOfCols, 1, pTaskInfo);
SOperatorInfo* pOperatorInfo = createTableScanOperator(param, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pTaskInfo);
pTaskInfo->pRoot = pOperatorInfo;
} else {
assert(0);
}
}
}
int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) {
STsdbQueryCond cond = {.order = TSDB_ORDER_ASC, .numOfCols = 2, .loadExternalRows = false};
STsdbQueryCond cond = {.loadExternalRows = false};
cond.twindow.skey = INT64_MIN;
cond.twindow.ekey = INT64_MAX;
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
// todo set the correct table column info
cond.colList[0].type = TSDB_DATA_TYPE_TIMESTAMP;
cond.colList[0].bytes = sizeof(uint64_t);
cond.colList[0].colId = 1;
uint64_t uid = 0;
SPhyNode* pPhyNode = pPlan->pNode;
if (pPhyNode->info.type == OP_TableScan) {
SScanPhyNode* pScanNode = (SScanPhyNode*) pPhyNode;
uid = pScanNode->uid;
cond.order = pScanNode->order;
cond.numOfCols = taosArrayGetSize(pScanNode->node.pTargets);
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
cond.colList[1].type = TSDB_DATA_TYPE_INT;
cond.colList[1].bytes = sizeof(int32_t);
cond.colList[1].colId = 2;
for(int32_t i = 0; i < cond.numOfCols; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pScanNode->node.pTargets, i);
assert(pExprInfo->pExpr->nodeType == TEXPR_COL_NODE);
SSchema* pSchema = pExprInfo->pExpr->pSchema;
cond.colList[i].type = pSchema->type;
cond.colList[i].bytes = pSchema->bytes;
cond.colList[i].colId = pSchema->colId;
}
} else {
assert(0);
}
STableGroupInfo group = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES)};
SArray* pa = taosArrayInit(1, sizeof(STableKeyInfo));
STableKeyInfo info = {.pTable = NULL, .lastKey = 0, .uid = 1};
STableKeyInfo info = {.pTable = NULL, .lastKey = 0, .uid = uid};
taosArrayPush(pa, &info);
taosArrayPush(group.pGroupList, &pa);
......
......@@ -160,11 +160,14 @@ void addExprInfo(SArray* pExprList, int32_t index, SExprInfo* pExprInfo, int32_t
taosArrayInsert(pExprList, index, &pExprInfo);
}
#if 0
if (pExprInfo->pExpr->nodeType == TEXPR_FUNCTION_NODE) {
printf("add function: %s, level:%d, total:%ld\n", pExprInfo->pExpr->_function.functionName, level, taosArrayGetSize(pExprList));
} else {
printf("add operator: %s, level:%d, total:%ld\n", pExprInfo->base.resSchema.name, level, taosArrayGetSize(pExprList));
}
#endif
}
void updateExprInfo(SExprInfo* pExprInfo, int16_t functionId, int32_t colId, int16_t srcColumnIndex, int16_t resType, int16_t resSize) {
......
......@@ -156,9 +156,14 @@ static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t si
}
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
SScanPhyNode* node = (SScanPhyNode*)initPhyNode(pPlanNode, type, size);
node->uid = pTable->pMeta->pTableMeta->uid;
node->tableType = pTable->pMeta->pTableMeta->tableType;
SScanPhyNode* node = (SScanPhyNode*) initPhyNode(pPlanNode, type, size);
STableMeta *pTableMeta = pTable->pMeta->pTableMeta;
node->uid = pTableMeta->uid;
node->count = 1;
node->order = TSDB_ORDER_ASC;
node->tableType = pTableMeta->tableType;
return (SPhyNode*)node;
}
......@@ -176,10 +181,10 @@ static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
return MAIN_SCAN;
}
static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pTable, op, sizeof(STableScanPhyNode));
node->scanFlag = getScanFlag(pPlanNode, pTable);
node->window = pTable->window;
static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pQueryTableInfo, int32_t op) {
STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pQueryTableInfo, op, sizeof(STableScanPhyNode));
node->scanFlag = getScanFlag(pPlanNode, pQueryTableInfo);
node->window = pQueryTableInfo->window;
// todo tag cond
return (SPhyNode*)node;
}
......
......@@ -136,7 +136,7 @@ static const cJSON* getArray(const cJSON* json, const char* name, int32_t* size)
static bool fromItem(const cJSON* jArray, FFromJson func, void* array, int32_t itemSize, int32_t size) {
for (int32_t i = 0; i < size; ++i) {
if (!func(cJSON_GetArrayItem(jArray, i), (char*)array + itemSize)) {
if (!func(cJSON_GetArrayItem(jArray, i), (char*)array + itemSize * i)) {
return false;
}
}
......@@ -165,9 +165,7 @@ static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, vo
static char* getString(const cJSON* json, const char* name) {
char* p = cJSON_GetStringValue(cJSON_GetObjectItem(json, name));
char* res = calloc(1, strlen(p) + 1);
strcpy(res, p);
return res;
return strdup(p);
}
static void copyString(const cJSON* json, const char* name, char* dst) {
......@@ -285,7 +283,7 @@ static bool columnInfoToJson(const void* obj, cJSON* jCol) {
static bool columnInfoFromJson(const cJSON* json, void* obj) {
SColumnInfo* col = (SColumnInfo*)obj;
col->colId = getNumber(json, jkColumnInfoColId);
col->type = getNumber(json, jkColumnInfoType);
col->type = getNumber(json, jkColumnInfoType);
col->bytes = getNumber(json, jkColumnInfoBytes);
int32_t size = 0;
bool res = fromRawArrayWithAlloc(json, jkColumnInfoFilterList, columnFilterInfoFromJson, (void**)&col->flist.filterInfo, sizeof(SColumnFilterInfo), &size);
......@@ -530,13 +528,25 @@ static bool timeWindowFromJson(const cJSON* json, void* obj) {
static const char* jkScanNodeTableId = "TableId";
static const char* jkScanNodeTableType = "TableType";
static const char* jkScanNodeTableOrder = "Order";
static const char* jkScanNodeTableCount = "Count";
static bool scanNodeToJson(const void* obj, cJSON* json) {
const SScanPhyNode* scan = (const SScanPhyNode*)obj;
bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, scan->uid);
if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableType, scan->tableType);
}
if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, scan->order);
}
if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, scan->count);
}
return res;
}
......@@ -544,6 +554,8 @@ static bool scanNodeFromJson(const cJSON* json, void* obj) {
SScanPhyNode* scan = (SScanPhyNode*)obj;
scan->uid = getNumber(json, jkScanNodeTableId);
scan->tableType = getNumber(json, jkScanNodeTableType);
scan->count = getNumber(json, jkScanNodeTableCount);
scan->order = getNumber(json, jkScanNodeTableOrder);
return true;
}
......@@ -748,9 +760,11 @@ static bool phyNodeToJson(const void* obj, cJSON* jNode) {
}
static bool phyNodeFromJson(const cJSON* json, void* obj) {
SPhyNode* node = (SPhyNode*)obj;
SPhyNode* node = (SPhyNode*) obj;
node->info.name = getString(json, jkPnodeName);
node->info.type = opNameToOpType(node->info.name);
bool res = fromArray(json, jkPnodeTargets, exprInfoFromJson, &node->pTargets, sizeof(SExprInfo));
if (res) {
res = fromArray(json, jkPnodeConditions, exprInfoFromJson, &node->pConditions, sizeof(SExprInfo));
......@@ -896,7 +910,8 @@ static SSubplan* subplanFromJson(const cJSON* json) {
}
bool res = fromObject(json, jkSubplanId, subplanIdFromJson, &subplan->id, true);
if (res) {
res = fromObjectWithAlloc(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode, sizeof(SPhyNode), false);
size_t size = MAX(sizeof(SPhyNode), sizeof(SScanPhyNode));
res = fromObjectWithAlloc(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode, size, false);
}
if (res) {
res = fromObjectWithAlloc(json, jkSubplanDataSink, dataSinkFromJson, (void**)&subplan->pDataSink, sizeof(SDataSink), false);
......@@ -925,8 +940,8 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
}
*str = cJSON_Print(json);
printf("%s\n", *str);
// printf("====Physical plan:====\n")
// printf("%s\n", *str);
*len = strlen(*str) + 1;
return TSDB_CODE_SUCCESS;
}
......
......@@ -65,9 +65,9 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
}
if (pLogicPlan->info.type != QNODE_MODIFY) {
char* str = NULL;
queryPlanToString(pLogicPlan, &str);
printf("%s\n", str);
// char* str = NULL;
// queryPlanToString(pLogicPlan, &str);
// printf("%s\n", str);
}
code = optimizeQueryPlan(pLogicPlan);
......
......@@ -798,7 +798,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rsp->code));
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));
......@@ -1364,16 +1364,10 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchJob *job = NULL;
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, &job, false));
*pJob = job;
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, false));
return TSDB_CODE_SUCCESS;
}
int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
if (NULL == pJob || NULL == pData) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
......
......@@ -160,7 +160,7 @@ int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
}
void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
}
......@@ -218,8 +218,6 @@ void *schtSendRsp(void *param) {
}
struct SSchJob *pInsertJob = NULL;
}
TEST(queryTest, normalCase) {
......@@ -336,7 +334,7 @@ TEST(insertTest, normalCase) {
uint64_t numOfRows = 0;
schtInitLogFile();
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
SEpAddr qnodeAddr = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册