未验证 提交 5ebc3779 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9940 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
......@@ -28,6 +28,7 @@ OP_ENUM_MACRO(DataBlocksOptScan)
OP_ENUM_MACRO(TableSeqScan)
OP_ENUM_MACRO(TagScan)
OP_ENUM_MACRO(SystemTableScan)
OP_ENUM_MACRO(StreamBlockScan)
OP_ENUM_MACRO(Aggregate)
OP_ENUM_MACRO(Project)
// OP_ENUM_MACRO(Groupby)
......
......@@ -650,7 +650,13 @@ TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
TAOS_RES* pRes = taos_query(pConn, "use dbv");
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tx using st tags(111111111111111)");
if (taos_errno(pRes) != 0) {
printf("failed to create table, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "select count(*) from tu");
......
......@@ -376,7 +376,7 @@ typedef struct STaskParam {
typedef struct SExchangeInfo {
SArray *pSources;
int32_t bytes; // total load bytes from remote
uint64_t bytes; // total load bytes from remote
tsem_t ready;
void *pTransporter;
SRetrieveTableRsp *pRsp;
......@@ -385,7 +385,7 @@ typedef struct SExchangeInfo {
typedef struct STableScanInfo {
void *pTsdbReadHandle;
int32_t numOfBlocks;
int32_t numOfBlocks; // extract basic running information.
int32_t numOfSkipped;
int32_t numOfBlockStatis;
int64_t numOfRows;
......@@ -415,7 +415,11 @@ typedef struct STagScanInfo {
} STagScanInfo;
typedef struct SStreamBlockScanInfo {
SSDataBlock *pRes; // result SSDataBlock
SColumnInfo *pCols; // the output column info
uint64_t numOfRows; // total scanned rows
uint64_t numOfExec; // execution times
void *readerHandle;// stream block reader handle
} SStreamBlockScanInfo;
typedef struct SOptrBasicInfo {
......@@ -423,7 +427,6 @@ typedef struct SOptrBasicInfo {
int32_t *rowCellInfoOffset; // offset value for each row result cell info
SQLFunctionCtx *pCtx;
SSDataBlock *pRes;
void *keyBuf;
} SOptrBasicInfo;
typedef struct SOptrBasicInfo STableIntervalOperatorInfo;
......
......@@ -12,7 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <parser.h>
#include "parser.h"
#include "tq.h"
#include "exception.h"
#include "os.h"
#include "tglobal.h"
......@@ -3576,7 +3577,7 @@ void setDefaultOutputBuf_rv(SAggOperatorInfo* pAggInfo, int64_t uid, int32_t sta
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
int64_t tid = 0;
pInfo->keyBuf = realloc(pInfo->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES);
pAggInfo->keyBuf = realloc(pAggInfo->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES);
SResultRow* pRow = doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid, pTaskInfo, false, pAggInfo);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
......@@ -5061,6 +5062,42 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
#endif
}
static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
// NOTE: this operator never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info;
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
while (tqNextDataBlock(pInfo->readerHandle)) {
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
terrno = pTaskInfo->code;
return NULL;
}
if (pBlockInfo->rows == 0) {
return NULL;
}
pInfo->pRes->pDataBlock = tqRetrieveDataBlock(pInfo->readerHandle);
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
pTaskInfo->code = terrno;
return NULL;
}
break;
}
// record the scan action.
pInfo->numOfExec++;
pInfo->numOfRows += pBlockInfo->rows;
return (pBlockInfo->rows == 0)? NULL:pInfo->pRes;
}
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SExchangeInfo* pEx = (SExchangeInfo*) param;
pEx->pRsp = pMsg->pData;
......@@ -5263,7 +5300,6 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo);
tfree(pOperator);
......@@ -5371,8 +5407,26 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
return pOperator;
}
SOperatorInfo* createSubmitBlockScanOperatorInfo(void *pSubmitBlockReadHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createStreamBlockScanOperatorInfo(void *pStreamBlockHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pInfo->readerHandle = pStreamBlockHandle;
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = OP_StreamBlockScan;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->exec = doStreamBlockScan;
pOperator->pTaskInfo = pTaskInfo;
}
......
......@@ -680,10 +680,9 @@ int32_t doCheckAndBuildCreateTableReq(SCreateTableSql* pCreateTable, SParseConte
serializeVgroupTablesBatchImpl(&tbatch, pBufArray);
destroyCreateTbReqBatch(&tbatch);
} else { // it is a child table, created according to a super table
code = doCheckAndBuildCreateCTableReq(pCreateTable, pCtx, pMsgBuf, &pBufArray);
if (code != 0) {
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
......
......@@ -1656,7 +1656,7 @@ static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, cha
}
// Remove quotation marks
if (TK_STRING == type) {
if (TSDB_DATA_TYPE_BINARY == type) {
if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
}
......
......@@ -181,20 +181,20 @@ void *threadFunc(void *param) {
pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(NULL));
exit(1);
}
pError("====before thread:%d, table range: %"PRId64 " - %"PRId64 "\n",
pInfo->threadIndex,
pError("====before thread:%d, table range: %"PRId64 " - %"PRId64 "\n",
pInfo->threadIndex,
pInfo->tableBeginIndex,
pInfo->tableEndIndex);
pInfo->tableBeginIndex += startOffset;
pInfo->tableEndIndex += startOffset;
pError("====after thread:%d, table range: %"PRId64 " - %"PRId64 "\n",
pInfo->threadIndex,
pError("====after thread:%d, table range: %"PRId64 " - %"PRId64 "\n",
pInfo->threadIndex,
pInfo->tableBeginIndex,
pInfo->tableEndIndex);
sprintf(qstr, "use %s", pInfo->dbName);
TAOS_RES *pRes = taos_query(con, qstr);
taos_free_result(pRes);
......@@ -367,7 +367,7 @@ void parseArgument(int32_t argc, char *argv[]) {
pPrint("%s dbName:%s %s", GREEN, dbName, NC);
pPrint("%s stbName:%s %s", GREEN, stbName, NC);
pPrint("%s configDir:%s %s", GREEN, configDir, NC);
pPrint("%s numOfTables:%" PRId64 " %s", GREEN, numOfTables, NC);
pPrint("%s numOfTables:%" PRId64 " %s", GREEN, numOfTables, NC);
pPrint("%s startOffset:%" PRId64 " %s", GREEN, startOffset, NC);
pPrint("%s numOfThreads:%d %s", GREEN, numOfThreads, NC);
pPrint("%s numOfVgroups:%d %s", GREEN, numOfVgroups, NC);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册