提交 4b2ab16e 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/3.0_query_integrate' into feature/3.0_query_integrate_wxy

......@@ -30,13 +30,13 @@ typedef int64_t tb_uid_t;
#define IS_TSWINDOW_SPECIFIED(win) (((win).skey != INT64_MIN) || ((win).ekey != INT64_MAX))
typedef enum {
TSDB_SUPER_TABLE = 1, // super table
TSDB_CHILD_TABLE = 2, // table created from super table
TSDB_NORMAL_TABLE = 3, // ordinary table
TSDB_STREAM_TABLE = 4, // table created from stream computing
TSDB_TEMP_TABLE = 5, // temp table created by nest query
TSDB_SUPER_TABLE = 1, // super table
TSDB_CHILD_TABLE = 2, // table created from super table
TSDB_NORMAL_TABLE = 3, // ordinary table
TSDB_STREAM_TABLE = 4, // table created from stream computing
TSDB_TEMP_TABLE = 5, // temp table created by nest query
TSDB_SYSTEM_TABLE = 6,
TSDB_TABLE_MAX = 7
TSDB_TABLE_MAX = 7
} ETableType;
typedef enum {
......
......@@ -879,6 +879,17 @@ typedef struct {
char data[];
} SRetrieveTableRsp;
typedef struct {
int64_t handle;
int64_t useconds;
int8_t completed; // all results are returned to client
int8_t precision;
int8_t compressed;
int32_t compLen;
int32_t numOfRows;
char data[];
} SRetrieveMetaTableRsp;
typedef struct {
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
int32_t port;
......
......@@ -163,7 +163,7 @@ typedef struct SInputColumnInfoData {
typedef struct SqlFunctionCtx {
SInputColumnInfoData input;
SResultDataInfo resDataInfo;
uint32_t order; // asc|desc
uint32_t order; // data block scanner order: asc|desc
////////////////////////////////////////////////////////////////
int32_t startRow; // start row index
int32_t size; // handled processed row number
......
......@@ -1732,6 +1732,7 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->showId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->type) < 0) return -1;
if (tEncodeI8(&encoder, pReq->free) < 0) return -1;
tEndEncode(&encoder);
......@@ -1746,6 +1747,7 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->type) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->free) < 0) return -1;
tEndDecode(&decoder);
......
......@@ -102,6 +102,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYSTABLE_RETRIEVE)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = dndProcessMnodeWriteMsg;
......
......@@ -1342,123 +1342,133 @@ char *mnGetDbStr(char *src) {
return pos;
}
static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SDbObj *pDb = NULL;
char *pWrite;
static char* getDataPosition(char* pData, SShowObj* pShow, int32_t cols, int32_t rows, int32_t capacityOfRow) {
return pData + pShow->offset[cols] * capacityOfRow + pShow->bytes[cols] * rows;
}
static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_t rows, int32_t rowCapacity) {
int32_t cols = 0;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_DB, pShow->pIter, (void **)&pDb);
if (pShow->pIter == NULL) break;
char* pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
char *name = mnGetDbStr(pDb->name);
if (name != NULL) {
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
} else {
STR_TO_VARSTR(pWrite, "NULL");
}
cols++;
cols = 0;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int64_t *)pWrite = pDb->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char *name = mnGetDbStr(pDb->name);
if (name != NULL) {
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
} else {
STR_TO_VARSTR(pWrite, "NULL");
}
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int16_t *)pWrite = pDb->cfg.numOfVgroups;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pDb->createdTime;
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int64_t *)pWrite = 0; // todo: num of Tables
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int16_t *)pWrite = pDb->cfg.replications;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDb->cfg.numOfVgroups;
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int16_t *)pWrite = pDb->cfg.quorum;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = 0; // todo
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int16_t *)pWrite = pDb->cfg.daysPerFile;
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
char tmp[128] = {0};
if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) {
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep0);
} else {
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2);
}
STR_WITH_SIZE_TO_VARSTR(pWrite, tmp, strlen(tmp));
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDb->cfg.replications;
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int32_t *)pWrite = pDb->cfg.cacheBlockSize;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDb->cfg.quorum;
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int32_t *)pWrite = pDb->cfg.totalBlocks;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDb->cfg.daysPerFile;
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int32_t *)pWrite = pDb->cfg.minRows;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char tmp[128] = {0};
if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) {
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep0);
} else {
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2);
}
STR_WITH_SIZE_TO_VARSTR(pWrite, tmp, strlen(tmp));
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.cacheBlockSize;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.totalBlocks;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.minRows;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.maxRows;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.walLevel;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.fsyncPeriod;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.compression;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.cacheLastRow;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char *prec = NULL;
switch (pDb->cfg.precision) {
case TSDB_TIME_PRECISION_MILLI:
prec = TSDB_TIME_PRECISION_MILLI_STR;
break;
case TSDB_TIME_PRECISION_MICRO:
prec = TSDB_TIME_PRECISION_MICRO_STR;
break;
case TSDB_TIME_PRECISION_NANO:
prec = TSDB_TIME_PRECISION_NANO_STR;
break;
default:
prec = "none";
break;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int32_t *)pWrite = pDb->cfg.maxRows;
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int8_t *)pWrite = pDb->cfg.walLevel;
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int32_t *)pWrite = pDb->cfg.fsyncPeriod;
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int8_t *)pWrite = pDb->cfg.compression;
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int8_t *)pWrite = pDb->cfg.cacheLastRow;
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
char *prec = NULL;
switch (pDb->cfg.precision) {
case TSDB_TIME_PRECISION_MILLI:
prec = TSDB_TIME_PRECISION_MILLI_STR;
break;
case TSDB_TIME_PRECISION_MICRO:
prec = TSDB_TIME_PRECISION_MICRO_STR;
break;
case TSDB_TIME_PRECISION_NANO:
prec = TSDB_TIME_PRECISION_NANO_STR;
break;
default:
prec = "none";
break;
}
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int8_t *)pWrite = pDb->cfg.update;
}
static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rowsCapacity) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SDbObj *pDb = NULL;
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_DB, pShow->pIter, (void **)&pDb);
if (pShow->pIter == NULL) {
break;
}
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.update;
cols++;
dumpDbInfoToPayload(data, pDb, pShow, numOfRows, rowsCapacity);
numOfRows++;
sdbRelease(pSdb, pDb);
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
// Append the information_schema database into the result.
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rowsCapacity, pShow);
pShow->numOfReads += numOfRows;
return numOfRows;
......
......@@ -39,24 +39,24 @@ static const SInfosTableSchema qnodesSchema[] = {{.name = "id", .byt
{.name = "end_point", .bytes = 134, .type = TSDB_DATA_TYPE_BINARY},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
};
static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "created_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "ntables", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "vgroups", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "replica", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "quorum", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "days", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "keep", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "vgroups", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "ntables", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "replica", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "quorum", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "days", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "keep", .bytes = 24 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "cache", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "blocks", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "minrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "maxrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "wallevel", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "wallevel", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
{.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "comp", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "cachelast", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "precision", .bytes = 2, .type = TSDB_DATA_TYPE_BINARY},
{.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
{.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
{.name = "cachelast", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
{.name = "precision", .bytes = 3 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
};
static const SInfosTableSchema userFuncSchema[] = {{.name = "name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
{.name = "created_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
......
......@@ -284,6 +284,20 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) {
strncpy(req.db, retrieveReq.db, tListLen(req.db));
pShow = mndCreateShowObj(pMnode, &req);
STableMetaRsp *meta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, TSDB_INS_TABLE_USER_DATABASES, strlen(TSDB_INS_TABLE_USER_DATABASES));
pShow->numOfRows = 100;
int32_t offset = 0;
for(int32_t i = 0; i < meta->numOfColumns; ++i) {
pShow->numOfColumns = meta->numOfColumns;
pShow->offset[i] = offset;
int32_t bytes = meta->pSchemas[i].bytes;
pShow->rowSize += bytes;
pShow->bytes[i] = bytes;
offset += bytes;
}
if (pShow == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to process show-meta req since %s", terrstr());
......@@ -330,7 +344,7 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) {
size = pShow->rowSize * rowsToRead;
size += SHOW_STEP_SIZE;
SRetrieveTableRsp *pRsp = rpcMallocCont(size);
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
mndReleaseShowObj((SShowObj*) pShow, false);
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -338,6 +352,8 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) {
return -1;
}
pRsp->handle = htobe64(pShow->id);
// if free flag is set, client wants to clean the resources
if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
rowsRead = (*retrieveFp)(pReq, (SShowObj*) pShow, pRsp->data, rowsToRead);
......
......@@ -391,6 +391,12 @@ typedef struct SSourceDataInfo {
int32_t status;
} SSourceDataInfo;
typedef struct SLoadRemoteDataInfo {
uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time
} SLoadRemoteDataInfo;
typedef struct SExchangeInfo {
SArray* pSources;
SArray* pSourceDataInfo;
......@@ -399,9 +405,7 @@ typedef struct SExchangeInfo {
SSDataBlock* pResult;
bool seqLoadData; // sequential load data or not, false by default
int32_t current;
uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time
SLoadRemoteDataInfo loadInfo;
} SExchangeInfo;
typedef struct STableScanInfo {
......@@ -440,14 +444,23 @@ typedef struct SStreamBlockScanInfo {
void* readerHandle; // stream block reader handle
} SStreamBlockScanInfo;
typedef struct SSysScanResInfo {
struct SSysTableScanInfo *pSysScanInfo;
SRetrieveTableRsp *pRsp;
uint64_t totalRows;
} SSysScanResInfo;
typedef struct SSysTableScanInfo {
union {
void* pTransporter;
void* readHandle;
};
SRetrieveMetaTableRsp *pRsp;
void *pCur; // cursor
SRetrieveTableReq* pReq;
SRetrieveTableReq req;
SEpSet epSet;
int32_t type; // show type
tsem_t ready;
......@@ -457,8 +470,7 @@ typedef struct SSysTableScanInfo {
int32_t capacity;
int64_t numOfBlocks; // extract basic running information.
int64_t totalRows;
int64_t elapsedTime;
int64_t totalBytes;
SLoadRemoteDataInfo loadInfo;
} SSysTableScanInfo;
typedef struct SOptrBasicInfo {
......@@ -639,8 +651,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema,
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, int32_t tableType, SEpSet epset,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
......
......@@ -4951,34 +4951,37 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf
return TSDB_CODE_SUCCESS;
}
static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SExchangeInfo *pExchangeInfo, SSourceDataInfo* pDataInfo, int32_t numOfOutput, int64_t startTs) {
char* pData = pDataInfo->pRsp->data;
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen,
int32_t numOfOutput, int64_t startTs, uint64_t* total) {
// char* pData = pRsp->data;
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pRsp->numOfRows);
char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * numOfRows);
if (tmp == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
size_t len = pRsp->numOfRows * pColInfoData->info.bytes;
size_t len = numOfRows * pColInfoData->info.bytes;
memcpy(tmp, pData, len);
pColInfoData->pData = tmp;
pData += len;
}
pRes->info.rows = pRsp->numOfRows;
pRes->info.rows = numOfRows;
int64_t el = taosGetTimestampUs() - startTs;
pExchangeInfo->totalRows += pRsp->numOfRows;
pExchangeInfo->totalSize += pRsp->compLen;
pDataInfo->totalRows += pRsp->numOfRows;
pLoadInfo->totalRows += numOfRows;
pLoadInfo->totalSize += compLen;
pExchangeInfo->totalElapsed += el;
if (total != NULL) {
*total += numOfRows;
}
pLoadInfo->totalElapsed += el;
return TSDB_CODE_SUCCESS;
}
......@@ -4988,11 +4991,12 @@ static void* setAllSourcesCompleted(SOperatorInfo *pOperator, int64_t startTs) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int64_t el = taosGetTimestampUs() - startTs;
pExchangeInfo->totalElapsed += el;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
pLoadInfo->totalElapsed += el;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0);
doSetOperatorCompleted(pOperator);
return NULL;
......@@ -5021,17 +5025,19 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
SSDataBlock* pRes = pExchangeInfo->pResult;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows,
pExchangeInfo->totalRows);
pExchangeInfo->loadInfo.totalRows);
pDataInfo->status = DATA_EXHAUSTED;
completed += 1;
continue;
}
code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pExchangeInfo, pDataInfo, pOperator->numOfOutput, startTs);
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows,
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows);
if (code != 0) {
goto _error;
}
......@@ -5040,13 +5046,13 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows,
pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, i + 1,
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1,
totalSources);
pDataInfo->status = DATA_EXHAUSTED;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows,
pExchangeInfo->totalSize);
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows,
pLoadInfo->totalSize);
}
if (pDataInfo->status != DATA_EXHAUSTED) {
......@@ -5118,10 +5124,12 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
pDataInfo->totalRows, pExchangeInfo->totalRows);
pDataInfo->totalRows, pLoadInfo->totalRows);
pDataInfo->status = DATA_EXHAUSTED;
pExchangeInfo->current += 1;
......@@ -5129,20 +5137,22 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
}
SSDataBlock* pRes = pExchangeInfo->pResult;
setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pExchangeInfo, pDataInfo, pOperator->numOfOutput, startTs);
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
int32_t code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows,
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows);
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows,
pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->current + 1,
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
totalSources);
pDataInfo->status = DATA_EXHAUSTED;
pExchangeInfo->current += 1;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->totalSize);
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows, pLoadInfo->totalSize);
}
return pExchangeInfo->pResult;
......@@ -5156,10 +5166,11 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pOperator->status == OP_EXEC_DONE) {
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0);
return NULL;
}
......@@ -5405,18 +5416,16 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock*
return pOperator;
}
static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) {
SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*) param;
pSourceDataInfo->pRsp = pMsg->pData;
SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*) param;
pScanResInfo->pRsp = pMsg->pData;
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp;
pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->handle = htobe64(pRsp->handle);
pRsp->compLen = htonl(pRsp->compLen);
pSourceDataInfo->status = DATA_READY;
tsem_post(&pSourceDataInfo->pEx->ready);
tsem_post(&pScanResInfo->ready);
}
static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
......@@ -5450,15 +5459,12 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
// pInfo->totalBytes;
return (pInfo->pRes->info.rows == 0)? NULL:pInfo->pRes;
} else { // load the meta from mnode of the given epset
if (pInfo->pReq == NULL) {
pInfo->pReq = calloc(1, sizeof(SRetrieveTableReq));
if (pInfo->pReq == NULL) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
int64_t startTs = taosGetTimestampUs();
pInfo->pReq->type = pInfo->type;
}
pInfo->req.type = pInfo->type;
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
char* buf1 = calloc(1, contLen);
tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
......@@ -5468,24 +5474,40 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
return NULL;
}
pMsgSendInfo->param = NULL;
pMsgSendInfo->msgInfo.pData = pInfo->pReq;
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableReq);
pMsgSendInfo->param = pInfo;
pMsgSendInfo->msgInfo.pData = buf1;
pMsgSendInfo->msgInfo.len = contLen;
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
pMsgSendInfo->fp = loadRemoteDataCallback;
pMsgSendInfo->fp = loadSysTableContentCb;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
tsem_wait(&pInfo->ready);
// handle the response and return to the caller
SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
pInfo->req.showId = pRsp->handle;
if (pRsp->numOfRows == 0) {
// qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next",
// GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
// pDataInfo->totalRows, pExchangeInfo->totalRows);
return NULL;
}
SSDataBlock* pRes = pInfo->pRes;
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
setSDataBlockFromFetchRsp(pRes, &pInfo->loadInfo, pTableRsp->numOfRows,
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL);
return pInfo->pRes;
}
return NULL;
}
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema,
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, int32_t tableType,
SEpSet epset, SExecTaskInfo* pTaskInfo) {
SSysTableScanInfo* pInfo = calloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -5495,7 +5517,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S
return NULL;
}
// todo: create the schema of result data block
pInfo->pRes = pResBlock;
pInfo->capacity = 4096;
pInfo->type = tableType;
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) {
......@@ -5512,11 +5534,34 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->nextDataFn = doSysTableScan;
pOperator->closeFn = destroySysTableScannerOperatorInfo;
pOperator->pTaskInfo = pTaskInfo;
#if 1
{ // todo refactor
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "DB-META";
rpcInit.numOfThreads = 1;
rpcInit.cfp = qProcessFetchRsp;
rpcInit.sessions = tsMaxConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)"root";
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.secret = (char *)"dcc5bed04851fec854c035b2e40263b6";
pInfo->pTransporter = rpcOpen(&rpcInit);
if (pInfo->pTransporter == NULL) {
return NULL; // todo
}
}
#endif
return pOperator;
}
......@@ -7266,16 +7311,15 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo));
int32_t numOfRows = 4096;
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows);
pInfo->binfo.capacity = 4096;
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity);
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset);
// initResultRowInfo(&pBInfo->resultRowInfo, 8);
// setFunctionResultOutput(pBInfo, MAIN_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ProjectOperator";
// pOperator->operatorType = OP_Project;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
......@@ -7283,6 +7327,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->nextDataFn = doProjectOperation;
pOperator->pTaskInfo = pTaskInfo;
pOperator->closeFn = destroyProjectOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -8137,6 +8182,14 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
taosArrayDestroy(tableIdList);
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) {
SSystemTableScanPhysiNode * pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(NULL, pResBlock, TSDB_MGMT_TABLE_DB, pSysScanPhyNode->mgmtEpSet, pTaskInfo);
return pOperator;
} else {
ASSERT(0);
}
}
......
......@@ -37,6 +37,10 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void minFunction(SqlFunctionCtx* pCtx);
void maxFunction(SqlFunctionCtx *pCtx);
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void firstFunction(SqlFunctionCtx *pCtx);
void lastFunction(SqlFunctionCtx *pCtx);
#ifdef __cplusplus
}
#endif
......
......@@ -61,6 +61,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = maxFunction,
.finalizeFunc = functionFinalizer
},
{
.name = "first",
.type = FUNCTION_TYPE_FIRST,
.classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = firstFunction,
.finalizeFunc = functionFinalizer
},
{
.name = "last",
.type = FUNCTION_TYPE_LAST,
.classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastFunction,
.finalizeFunc = functionFinalizer
},
{
.name = "concat",
.type = FUNCTION_TYPE_CONCAT,
......@@ -98,6 +118,8 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType };
break;
}
case FUNCTION_TYPE_FIRST:
case FUNCTION_TYPE_LAST:
case FUNCTION_TYPE_MIN:
case FUNCTION_TYPE_MAX: {
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
......
......@@ -72,13 +72,12 @@ void countFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElem = 0;
/*
* 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->isAggSet == true;
* 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->isAggSet == true;
* 3. for primary key column, pCtx->hasNull always be false, pCtx->isAggSet == false;
* 1. column data missing (schema modified) causes pInputCol->hasNull == true. pInput->colDataAggIsSet == true;
* 2. for general non-primary key columns, pInputCol->hasNull may be true or false, pInput->colDataAggIsSet == true;
* 3. for primary key column, pInputCol->hasNull always be false, pInput->colDataAggIsSet == false;
*/
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
if (pInput->colDataAggIsSet && pInput->totalRows == pInput->numOfRows) {
numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull;
ASSERT(numOfElem >= 0);
......@@ -173,7 +172,7 @@ void sumFunction(SqlFunctionCtx *pCtx) {
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
}
bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SSumRes);
return true;
}
......@@ -265,8 +264,7 @@ bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
return true;
}
bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(int64_t);
return true;
}
......@@ -278,34 +276,34 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
do { \
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
__ctx->fpSet.process(__ctx); \
__ctx->fpSet.process(__ctx); \
} \
} while (0);
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
__ctx->fpSet.process(__ctx); \
} \
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
__ctx->fpSet.process(__ctx); \
} \
} while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
do { \
if (((left) < (right)) ^ (sign)) { \
(left) = (right); \
DO_UPDATE_SUBSID_RES(ctx, _ts); \
(num) += 1; \
} \
do { \
if (((left) < (right)) ^ (sign)) { \
(left) = (right); \
DO_UPDATE_SUBSID_RES(ctx, _ts); \
(num) += 1; \
} \
} while (0)
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
do { \
_t* d = (_t*)((_col)->pData); \
_t *d = (_t *)((_col)->pData); \
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
......@@ -445,4 +443,118 @@ void minFunction(SqlFunctionCtx *pCtx) {
void maxFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = doMinMaxHelper(pCtx, 0);
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
}
\ No newline at end of file
}
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
pEnv->calcMemSize = pNode->node.resType.bytes;
return true;
}
// TODO fix this
// This ordinary first function only handle the data block in ascending order
void firstFunction(SqlFunctionCtx *pCtx) {
if (pCtx->order == TSDB_ORDER_DESC) {
return;
}
int32_t numOfElems = 0;
struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
// All null data column, return directly.
if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) {
ASSERT(pInputCol->hasNull == true);
return;
}
// Check for the first not null data
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
continue;
}
char* data = colDataGetData(pInputCol, i);
memcpy(buf, data, pInputCol->info.bytes);
// TODO handle the subsidary value
// if (pCtx->ptsList != NULL) {
// TSKEY k = GET_TS_DATA(pCtx, i);
// DO_UPDATE_TAG_COLUMNS(pCtx, k);
// }
pResInfo->hasResult = DATA_SET_FLAG;
pResInfo->complete = true;
numOfElems++;
break;
}
SET_VAL(pResInfo, numOfElems, 1);
}
void lastFunction(SqlFunctionCtx *pCtx) {
if (pCtx->order != TSDB_ORDER_DESC) {
return;
}
int32_t numOfElems = 0;
struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
// All null data column, return directly.
if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) {
ASSERT(pInputCol->hasNull == true);
return;
}
if (pCtx->order == TSDB_ORDER_DESC) {
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
continue;
}
char* data = colDataGetData(pInputCol, i);
memcpy(buf, data, pInputCol->info.bytes);
// TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->hasResult = DATA_SET_FLAG;
pResInfo->complete = true; // set query completed on this column
numOfElems++;
break;
}
} else { // ascending order
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
continue;
}
char* data = colDataGetData(pInputCol, i);
TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) {
pResInfo->hasResult = DATA_SET_FLAG;
memcpy(buf, data, pCtx->inputBytes);
*(TSKEY*)buf = ts;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
}
numOfElems++;
break;
}
}
SET_VAL(pResInfo, numOfElems, 1);
}
......@@ -56,7 +56,7 @@ protected:
const string syntaxTreeStr = toString(query_->pRoot, false);
SLogicNode* pLogicPlan = nullptr;
SPlanContext cxt = { .queryId = 1, .pAstRoot = query_->pRoot };
SPlanContext cxt = { .queryId = 1, .acctId = 0, .pAstRoot = query_->pRoot };
code = createLogicPlan(&cxt, &pLogicPlan);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册