提交 1e061499 编写于 作者: H Haojun Liao

[td-11818] support prefix in show, and add if not exists check.

上级 2f3a58f8
......@@ -71,15 +71,15 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
return NULL;
}
char tmp[TSDB_DB_NAME_LEN] = {0};
char localDb[TSDB_DB_NAME_LEN] = {0};
if (db != NULL) {
if(!validateDbName(db)) {
terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
return NULL;
}
tstrncpy(tmp, db, sizeof(tmp));
strdequote(tmp);
tstrncpy(localDb, db, sizeof(localDb));
strdequote(localDb);
}
char secretEncrypt[32] = {0};
......@@ -122,7 +122,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
}
tfree(key);
return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, *pInst);
return taosConnectImpl(ip, user, &secretEncrypt[0], localDb, port, NULL, NULL, *pInst);
}
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
......
......@@ -374,6 +374,12 @@ typedef struct STaskParam {
struct SUdfInfo *pUdfInfo;
} STaskParam;
typedef struct SExchangeInfo {
int32_t numOfSources;
SEpSet *pEpset;
int32_t bytes; // total load bytes from remote
} SExchangeInfo;
typedef struct STableScanInfo {
void *pTsdbReadHandle;
int32_t numOfBlocks;
......@@ -393,12 +399,9 @@ typedef struct STableScanInfo {
SSDataBlock block;
int32_t numOfOutput;
int64_t elapsedTime;
int32_t tableIndex;
int32_t prevGroupId; // previous table group id
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
STimeWindow window;
} STableScanInfo;
typedef struct STagScanInfo {
......@@ -542,8 +545,10 @@ typedef struct SOrderOperatorInfo {
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createExchangeOperatorInfo(const SVgroupInfo* pVgroups, int32_t numOfSources, int32_t numOfOutput, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperator(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
......
......@@ -3564,8 +3564,6 @@ static void setupEnvForReverseScan(STableScanInfo *pTableScanInfo, SQLFunctionCt
// }
// reverse order time range
SWAP(pTableScanInfo->window.skey, pTableScanInfo->window.ekey, TSKEY);
SET_REVERSE_SCAN_FLAG(pTableScanInfo);
// setTaskStatus(pTableScanInfo, QUERY_NOT_COMPLETED);
......@@ -4913,10 +4911,85 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
pOperator->status = OP_EXEC_DONE;
return pBlock;
#endif
}
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
}
static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
SExchangeInfo *pExchangeInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
*newgroup = false;
SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq));
if (NULL == pMsg) { // todo handle malloc error
}
SEpSet epSet;
int64_t sId = -1, queryId = 0, taskId = 1, vgId = 1;
pMsg->header.vgId = htonl(vgId);
pMsg->sId = htobe64(sId);
pMsg->taskId = htobe64(taskId);
pMsg->queryId = htobe64(queryId);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", queryId, taskId, (int32_t)sizeof(SMsgSendInfo));
}
pMsgSendInfo->param = NULL;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = TDMT_VND_FETCH;
pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
void* pTransporter = NULL;
int32_t code = asyncSendMsgToServer(pTransporter, &epSet, &transporterId, pMsgSendInfo);
printf("abc\n");
getchar();
// add it into the sink node
}
SOperatorInfo* createTableScanOperator(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createExchangeOperatorInfo(const SVgroupInfo* pVgroups, int32_t numOfSources, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
assert(numOfSources > 0);
SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pInfo->numOfSources = numOfSources;
pOperator->name = "ExchangeOperator";
pOperator->operatorType = OP_Exchange;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = NULL;
pOperator->exec = doLoadRemoteData;
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
}
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0 && numOfOutput > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
......@@ -4995,7 +5068,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableSeqScanOperator";
// pOperator->operatorType = OP_TableSeqScan;
pOperator->operatorType = OP_TableSeqScan;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
......@@ -7284,7 +7357,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
if (pPhyNode->info.type == OP_TableScan) {
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode;
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
return createTableScanOperator(param, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pTaskInfo);
return createTableScanOperatorInfo(param, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pTaskInfo);
} else if (pPhyNode->info.type == OP_DataBlocksOptScan) {
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode;
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
......
......@@ -8,7 +8,7 @@
SCreateUserReq* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SCreateAcctReq* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SDropUserReq* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext* pParseCtx, char* msgBuf, int32_t msgLen);
SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext *pCtx, SMsgBuf* pMsgBuf);
SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
......
......@@ -85,8 +85,12 @@ SDropUserReq* buildDropUserMsg(SSqlInfo* pInfo, int32_t *msgLen, int64_t id, cha
return pMsg;
}
SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext *pCtx, char* msgBuf, int32_t msgLen) {
SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext *pCtx, SMsgBuf* pMsgBuf) {
SShowReq* pShowMsg = calloc(1, sizeof(SShowReq));
if (pShowMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return pShowMsg;
}
pShowMsg->type = pShowInfo->showType;
if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
......@@ -105,7 +109,18 @@ SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext *pCtx, char* msgBuf,
if (pShowInfo->showType == TSDB_MGMT_TABLE_STB || pShowInfo->showType == TSDB_MGMT_TABLE_VGROUP) {
SName n = {0};
tNameSetDbName(&n, pCtx->acctId, pCtx->db, strlen(pCtx->db));
if (pShowInfo->prefix.n > 0) {
if (pShowInfo->prefix.n >= TSDB_DB_FNAME_LEN) {
terrno = buildInvalidOperationMsg(pMsgBuf, "prefix name is too long");
tfree(pShowMsg);
return NULL;
}
tNameSetDbName(&n, pCtx->acctId, pShowInfo->prefix.z, pShowInfo->prefix.n);
} else {
tNameSetDbName(&n, pCtx->acctId, pCtx->db, strlen(pCtx->db));
}
tNameGetFullDbName(&n, pShowMsg->db);
}
......@@ -240,6 +255,9 @@ SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len
}
SMCreateStbReq* pCreateStbMsg = (SMCreateStbReq*)calloc(1, sizeof(SMCreateStbReq) + (numOfCols + numOfTags) * sizeof(SSchema));
if (pCreateStbMsg == NULL) {
return NULL;
}
char* pMsg = NULL;
#if 0
......
......@@ -109,7 +109,11 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out
}
*pEpSet = pCtx->mgmtEpSet;
*output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len);
*output = buildShowMsg(pShowInfo, pCtx, pMsgBuf);
if (*output == NULL) {
return terrno;
}
*outputLen = sizeof(SShowReq) /* + htons(pShowMsg->payloadLen)*/;
}
......@@ -312,9 +316,9 @@ int32_t doCheckForCreateTable(SCreateTableSql* pCreateTable, SMsgBuf* pMsgBuf) {
assert(pFieldList != NULL);
// if sql specifies db, use it, otherwise use default db
SToken* pzTableName = &(pCreateTable->name);
SToken* pNameToken = &(pCreateTable->name);
if (parserValidateNameToken(pzTableName) != TSDB_CODE_SUCCESS) {
if (parserValidateIdToken(pNameToken) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册