提交 dce38817 编写于 作者: X Xiaoyu Wang

merge 3.0

上级 22a26d3d
......@@ -52,7 +52,8 @@ typedef struct SQuery {
SSchema* pResSchema;
SCmdMsgInfo* pCmdMsg;
int32_t msgType;
bool streamQuery;
SArray* pDbList;
SArray* pTableList;
} SQuery;
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
......
......@@ -86,7 +86,7 @@ int32_t taosHashGetSize(const SHashObj *pHashObj);
* @param size
* @return
*/
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size);
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t size);
/**
* return the payload data with the specified key
......
......@@ -159,8 +159,12 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
}
code = qParseQuerySql(&cxt, pQuery);
if (TSDB_CODE_SUCCESS == code && ((*pQuery)->haveResultSet)) {
setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
if (TSDB_CODE_SUCCESS == code) {
if ((*pQuery)->haveResultSet) {
setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
}
TSWAP(pRequest->dbList, (*pQuery)->pDbList, SArray*);
TSWAP(pRequest->tableList, (*pQuery)->pTableList, SArray*);
}
return code;
......
......@@ -238,7 +238,11 @@ int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
// todo rsp with the vnode id list
SRequestObj* pRequest = param;
free(pMsg->pData);
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
}
tsem_post(&pRequest->body.rspSem);
return code;
}
int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
......
......@@ -5337,11 +5337,13 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = size;
pOperator->getNextFn = doLoadRemoteData;
pOperator->pTaskInfo = pTaskInfo;
pOperator->_openFn = prepareLoadRemoteData; // assign a dummy function.
pOperator->getNextFn = doLoadRemoteData;
pOperator->closeFn = destroyExchangeOperatorInfo;
#if 1
{ // todo refactor
......
......@@ -515,7 +515,7 @@ SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken*
SValueNode* val = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
CHECK_OUT_OF_MEM(val);
val->literal = strndup(pLiteral->z, pLiteral->n);
if (IS_VAR_DATA_TYPE(dataType) || TSDB_DATA_TYPE_TIMESTAMP == dataType) {
if (TK_NK_ID != pLiteral->type && (IS_VAR_DATA_TYPE(dataType) || TSDB_DATA_TYPE_TIMESTAMP == dataType)) {
trimString(pLiteral->z, pLiteral->n, val->literal, pLiteral->n);
}
CHECK_OUT_OF_MEM(val->literal);
......
......@@ -30,8 +30,14 @@ typedef struct STranslateContext {
ESqlClause currClause;
SSelectStmt* pCurrStmt;
SCmdMsgInfo* pCmdMsg;
SHashObj* pDbs;
SHashObj* pTables;
} STranslateContext;
typedef struct SFullDatabaseName {
char fullDbName[TSDB_DB_FNAME_LEN];
} SFullDatabaseName;
static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode);
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode);
......@@ -78,64 +84,96 @@ static SName* toName(int32_t acctId, const char* pDbName, const char* pTableName
return pName;
}
static int32_t getTableMetaImpl(SParseContext* pCxt, const SName* pName, STableMeta** pMeta) {
int32_t code = catalogGetTableMeta(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pMeta);
static int32_t collectUseDatabase(const char* pFullDbName, SHashObj* pDbs) {
SFullDatabaseName name = {0};
strcpy(name.fullDbName, pFullDbName);
return taosHashPut(pDbs, pFullDbName, strlen(pFullDbName), &name, sizeof(SFullDatabaseName));
}
static int32_t collectUseTable(const SName* pName, SHashObj* pDbs) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
return taosHashPut(pDbs, fullName, strlen(fullName), pName, sizeof(SName));
}
static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STableMeta** pMeta) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseTable(pName, pCxt->pTables);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pMeta);
}
if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname);
}
return code;
}
static int32_t getTableMeta(SParseContext* pCxt, const char* pDbName, const char* pTableName, STableMeta** pMeta) {
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->acctId };
static int32_t getTableMeta(STranslateContext* pCxt, const char* pDbName, const char* pTableName, STableMeta** pMeta) {
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
strcpy(name.dbname, pDbName);
strcpy(name.tname, pTableName);
return getTableMetaImpl(pCxt, &name, pMeta);
}
static int32_t getTableDistVgInfo(SParseContext* pCxt, const SName* pName, SArray** pVgInfo) {
int32_t code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pVgInfo);
static int32_t getTableDistVgInfo(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseTable(pName, pCxt->pTables);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetTableDistVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pVgInfo);
}
if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetTableDistVgInfo error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname);
}
return code;
}
static int32_t getDBVgInfoImpl(SParseContext* pCxt, const SName* pName, SArray** pVgInfo) {
static int32_t getDBVgInfoImpl(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) {
SParseContext* pParCxt = pCxt->pParseCxt;
char fullDbName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pName, fullDbName);
int32_t code = catalogGetDBVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, fullDbName, pVgInfo);
int32_t code = collectUseDatabase(fullDbName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetDBVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, fullDbName, pVgInfo);
}
if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetDBVgInfo error, code:%s, dbFName:%s", tstrerror(code), fullDbName);
}
return code;
}
static int32_t getDBVgInfo(SParseContext* pCxt, const char* pDbName, SArray** pVgInfo) {
static int32_t getDBVgInfo(STranslateContext* pCxt, const char* pDbName, SArray** pVgInfo) {
SName name;
tNameSetDbName(&name, pCxt->acctId, pDbName, strlen(pDbName));
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pDbName, strlen(pDbName));
char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname);
return getDBVgInfoImpl(pCxt, &name, pVgInfo);
}
static int32_t getTableHashVgroupImpl(SParseContext* pCxt, const SName* pName, SVgroupInfo* pInfo) {
int32_t code = catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pInfo);
static int32_t getTableHashVgroupImpl(STranslateContext* pCxt, const SName* pName, SVgroupInfo* pInfo) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseTable(pName, pCxt->pTables);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetTableHashVgroup(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pInfo);
}
if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetTableHashVgroup error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname);
}
return code;
}
static int32_t getTableHashVgroup(SParseContext* pCxt, const char* pDbName, const char* pTableName, SVgroupInfo* pInfo) {
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->acctId };
static int32_t getTableHashVgroup(STranslateContext* pCxt, const char* pDbName, const char* pTableName, SVgroupInfo* pInfo) {
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
strcpy(name.dbname, pDbName);
strcpy(name.tname, pTableName);
return getTableHashVgroupImpl(pCxt, &name, pInfo);
}
static int32_t getDBVgVersion(SParseContext* pCxt, const char* pDbFName, int32_t* pVersion, int64_t* pDbId, int32_t* pTableNum) {
int32_t code = catalogGetDBVgVersion(pCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum);
static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int32_t* pVersion, int64_t* pDbId, int32_t* pTableNum) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseDatabase(pDbFName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetDBVgVersion(pParCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum);
}
if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetDBVgVersion error, code:%s, dbFName:%s", tstrerror(code), pDbFName);
}
......@@ -559,7 +597,7 @@ static int32_t toVgroupsInfo(SArray* pVgs, SVgroupsInfo** pVgsInfo) {
return TSDB_CODE_SUCCESS;
}
static int32_t setSysTableVgroupList(SParseContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
// todo release
// if (0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) {
// return TSDB_CODE_SUCCESS;
......@@ -586,8 +624,8 @@ static int32_t setSysTableVgroupList(SParseContext* pCxt, SName* pName, SRealTab
return code;
}
static int32_t setTableVgroupList(SParseContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
if (pCxt->topicQuery) {
static int32_t setTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
if (pCxt->pParseCxt->topicQuery) {
return TSDB_CODE_SUCCESS;
}
......@@ -618,12 +656,12 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
case QUERY_NODE_REAL_TABLE: {
SRealTableNode* pRealTable = (SRealTableNode*)pTable;
SName name;
code = getTableMetaImpl(pCxt->pParseCxt,
code = getTableMetaImpl(pCxt,
toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name), &(pRealTable->pMeta));
if (TSDB_CODE_SUCCESS != code) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName);
}
code = setTableVgroupList(pCxt->pParseCxt, &name, pRealTable);
code = setTableVgroupList(pCxt, &name, pRealTable);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
......@@ -1020,7 +1058,7 @@ static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt
STableMeta* pTableMeta = NULL;
SName tableName;
int32_t code = getTableMetaImpl(
pCxt->pParseCxt, toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &tableName), &pTableMeta);
pCxt, toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &tableName), &pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_SUPER_TABLE == pTableMeta->tableType) {
code = doTranslateDropSuperTable(pCxt, &tableName, pClause->ignoreNotExists);
......@@ -1113,7 +1151,7 @@ static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* p
SName name = {0};
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
tNameExtractFullName(&name, usedbReq.db);
int32_t code = getDBVgVersion(pCxt->pParseCxt, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable);
int32_t code = getDBVgVersion(pCxt, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
......@@ -1319,7 +1357,7 @@ static int32_t translateShowTables(STranslateContext* pCxt) {
SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
SArray* array = NULL;
int32_t code = getDBVgInfo(pCxt->pParseCxt, pCxt->pParseCxt->db, &array);
int32_t code = getDBVgInfo(pCxt, pCxt->pParseCxt->db, &array);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
......@@ -1663,6 +1701,9 @@ static void destroyTranslateContext(STranslateContext* pCxt) {
tfree(pCxt->pCmdMsg->pMsg);
tfree(pCxt->pCmdMsg);
}
taosHashCleanup(pCxt->pDbs);
taosHashCleanup(pCxt->pTables);
}
static const char* getSysTableName(ENodeType type) {
......@@ -1938,7 +1979,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
SCreateTableStmt* pStmt = (SCreateTableStmt*)pQuery->pRoot;
SVgroupInfo info = {0};
int32_t code = getTableHashVgroup(pCxt->pParseCxt, pStmt->dbName, pStmt->tableName, &info);
int32_t code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
SArray* pBufArray = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = buildCreateTableDataBlock(pCxt->pParseCxt->acctId, pStmt, &info, &pBufArray);
......@@ -2078,7 +2119,7 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableClause* pStmt, SHashObj* pVgroupHashmap) {
STableMeta* pSuperTableMeta = NULL;
int32_t code = getTableMeta(pCxt->pParseCxt, pStmt->useDbName, pStmt->useTableName, &pSuperTableMeta);
int32_t code = getTableMeta(pCxt, pStmt->useDbName, pStmt->useTableName, &pSuperTableMeta);
SKVRowBuilder kvRowBuilder = {0};
if (TSDB_CODE_SUCCESS == code) {
......@@ -2105,7 +2146,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
SVgroupInfo info = {0};
if (TSDB_CODE_SUCCESS == code) {
code = getTableHashVgroup(pCxt->pParseCxt, pStmt->dbName, pStmt->tableName, &info);
code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
}
if (TSDB_CODE_SUCCESS == code) {
addCreateTbReqIntoVgroup(pVgroupHashmap, pStmt->dbName, pStmt->tableName, row, pSuperTableMeta->uid, &info);
......@@ -2227,6 +2268,31 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
pQuery->msgType = pQuery->pCmdMsg->msgType;
break;
}
if (NULL != pCxt->pDbs) {
pQuery->pDbList = taosArrayInit(taosHashGetSize(pCxt->pDbs), TSDB_DB_FNAME_LEN);
if (NULL == pQuery->pDbList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SFullDatabaseName* pDb = taosHashIterate(pCxt->pDbs, NULL);
while (NULL != pDb) {
taosArrayPush(pQuery->pDbList, pDb->fullDbName);
pDb = taosHashIterate(pCxt->pDbs, pDb);
}
}
if (NULL != pCxt->pTables) {
pQuery->pTableList = taosArrayInit(taosHashGetSize(pCxt->pTables), sizeof(SName));
if (NULL == pQuery->pTableList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SName* pTable = taosHashIterate(pCxt->pTables, NULL);
while (NULL != pTable) {
taosArrayPush(pQuery->pTableList, pTable);
pTable = taosHashIterate(pCxt->pTables, pTable);
}
}
return code;
}
......@@ -2237,8 +2303,13 @@ int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery) {
.msgBuf = { .buf = pParseCxt->pMsg, .len = pParseCxt->msgLen },
.pNsLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES),
.currLevel = 0,
.currClause = 0
.currClause = 0,
.pDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK),
.pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK)
};
if (NULL == cxt.pNsLevel) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = fmFuncMgtInit();
if (TSDB_CODE_SUCCESS == code) {
code = rewriteQuery(&cxt, pQuery);
......
......@@ -305,7 +305,7 @@ int32_t taosHashGetSize(const SHashObj *pHashObj) {
return (int32_t)atomic_load_64(&pHashObj->size);
}
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t size) {
if (pHashObj == NULL || key == NULL || keyLen == 0) {
return -1;
}
......
......@@ -39,10 +39,11 @@ endi
print =============== drop database
sql drop database d1
sql show databases
if $rows != 1 then
return -1
endi
# todo release
#sql show databases
#if $rows != 1 then
# return -1
#endi
print =============== more databases
sql create database d2 vgroups 2
......
......@@ -20,7 +20,7 @@ sql show databases
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
if $rows != 1 then
if $rows != 2 then
return -1
endi
if $data00 != $db then
......@@ -52,16 +52,17 @@ print =============== step2
sql_error create database $db
sql create database if not exists $db
sql show databases
if $rows != 1 then
if $rows != 2 then
return -1
endi
print =============== step3
sql drop database $db
sql show databases
if $rows != 0 then
return -1
endi
# todo release
#sql show databases
#if $rows != 1 then
# return -1
#endi
print =============== step4
sql_error drop database $db
......
......@@ -16,11 +16,17 @@ create1:
return -1
endi
# todo remove
sql create database useless_db
sql show dnodes
if $data4_2 != ready then
goto create1
endi
# todo remove
sql drop database useless_db
print ========== stop dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGKILL
......@@ -42,7 +48,7 @@ re-create1:
sql create database d1 vgroups 2 -x re-create1
sql show databases
if $rows != 1 then
if $rows != 2 then
return -1
endi
......@@ -80,7 +86,7 @@ re-create2:
sql create database d1 vgroups 5 -x re-create2
sql show databases
if $rows != 1 then
if $rows != 2 then
return -1
endi
......
......@@ -6,7 +6,7 @@ sql connect
print =============== create database
sql create database d1
sql show databases
if $rows != 1 then
if $rows != 2 then
return -1
endi
......
......@@ -4,14 +4,7 @@ system sh/exec.sh -n dnode1 -s start
sql connect
# todo remove
print =============== create database
sleep 3000
sql create database useless_db
sleep 1000
sql show databases
if $rows != 1 then
return -1
endi
print =============== show users
sql show users
......@@ -81,4 +74,7 @@ print $data10 $data11 $data22
print $data20 $data11 $data22
print $data30 $data31 $data32
# todo remove
sql drop database useless_db
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册