提交 313248ce 编写于 作者: G Ganlin Zhao

Merge branch '3.0' into fix/TD-17799

......@@ -56,7 +56,6 @@ enum {
STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__MERGED_SUBMIT,
// STREAM_INPUT__TABLE_SCAN,
STREAM_INPUT__TQ_SCAN,
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES,
......@@ -154,7 +153,7 @@ typedef struct SQueryTableDataCond {
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo* colList;
int32_t type; // data block load type:
int32_t type; // data block load type:
STimeWindow twindows;
int64_t startVersion;
int64_t endVersion;
......
......@@ -283,6 +283,7 @@ typedef struct SCreateIndexStmt {
EIndexType indexType;
bool ignoreExists;
char indexName[TSDB_INDEX_NAME_LEN];
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
SNodeList* pCols;
SIndexOptions* pOptions;
......
......@@ -41,7 +41,7 @@ extern "C" {
#define WAL_REFRESH_MS 1000
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
typedef enum {
......
......@@ -162,10 +162,7 @@ int32_t tNameGetDbName(const SName* name, char* dst) {
return 0;
}
const char* tNameGetDbNameP(const SName* name) {
return &name->dbname[0];
}
const char* tNameGetDbNameP(const SName* name) { return &name->dbname[0]; }
int32_t tNameGetFullDbName(const SName* name, char* dst) {
assert(name != NULL && dst != NULL);
......@@ -212,7 +209,6 @@ int32_t tNameAddTbName(SName* dst, const char* tbName, size_t nameLen) {
return 0;
}
int32_t tNameSetAcctId(SName* dst, int32_t acctId) {
assert(dst != NULL);
dst->acctId = acctId;
......@@ -266,11 +262,21 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
char* start = (char*)((p == NULL) ? str : (p + 1));
int32_t len = 0;
p = strstr(start, TS_PATH_DELIMITER);
if (p == NULL) {
len = (int32_t)strlen(start);
if (TS_ESCAPE_CHAR == *start) {
++start;
char* end = start;
while ('`' != *end) {
++end;
}
len = end - start;
p = ++end;
} else {
len = (int32_t)(p - start);
p = strstr(start, TS_PATH_DELIMITER);
if (p == NULL) {
len = (int32_t)strlen(start);
} else {
len = (int32_t)(p - start);
}
}
// too long account id or too long db name
......@@ -288,6 +294,10 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
// too long account id or too long db name
int32_t len = (int32_t)strlen(start);
if (TS_ESCAPE_CHAR == *start) {
len -= 2;
++start;
}
if ((len >= tListLen(dst->tname)) || (len <= 0)) {
return -1;
}
......@@ -340,7 +350,7 @@ void buildChildTableName(RandTableName* rName) {
char temp[8] = {0};
rName->childTableName[0] = 't';
rName->childTableName[1] = '_';
for(int i = 0; i < 16; i++){
for (int i = 0; i < 16; i++) {
sprintf(temp, "%02x", context.digest[i]);
strcat(rName->childTableName, temp);
}
......
......@@ -78,7 +78,10 @@ struct SMeta {
TTB* pTagIdx;
TTB* pTtlIdx;
TTB* pSmaIdx;
TTB* pSmaIdx;
TTB* pTaskIdx;
SMetaIdx* pIdx;
};
......
......@@ -153,7 +153,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
......
......@@ -22,6 +22,7 @@ static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
......@@ -130,6 +131,12 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
goto _err;
}
ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pTaskIdx);
if (ret < 0) {
metaError("vgId: %d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open index
if (metaOpenIdx(pMeta) < 0) {
metaError("vgId:%d, failed to open meta index since %s", TD_VID(pVnode), tstrerror(terrno));
......@@ -143,6 +150,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
_err:
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
......@@ -162,6 +170,7 @@ _err:
int metaClose(SMeta *pMeta) {
if (pMeta) {
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
......@@ -378,3 +387,16 @@ static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
return 0;
}
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
int32_t uid1 = *(int32_t *)pKey1;
int32_t uid2 = *(int32_t *)pKey2;
if (uid1 > uid2) {
return 1;
} else if (uid1 < uid2) {
return -1;
}
return 0;
}
......@@ -695,7 +695,7 @@ FAIL:
return -1;
}
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
void* pIter = NULL;
bool failed = false;
SStreamDataSubmit* pSubmit = NULL;
......@@ -713,7 +713,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
SStreamTask* pTask = *(SStreamTask**)pIter;
if (!pTask->isDataScan) continue;
qDebug("data submit enqueue stream task: %d", pTask->taskId);
qDebug("data submit enqueue stream task: %d, ver: %ld", pTask->taskId, ver);
if (!failed) {
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
......
......@@ -48,7 +48,7 @@ int32_t tqMetaOpen(STQ* pTq) {
ASSERT(0);
}
if (tdbTbOpen("handles", -1, -1, 0, pTq->pMetaStore, &pTq->pExecStore) < 0) {
if (tdbTbOpen("handles", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) {
ASSERT(0);
}
......
......@@ -252,7 +252,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
SSubmitReq* pReq = (SSubmitReq*)data;
pReq->version = ver;
tqProcessStreamTrigger(pTq, data);
tqProcessStreamTrigger(pTq, data, ver);
}
return 0;
......
......@@ -177,7 +177,7 @@ SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode);
SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const SToken* pConfig, const SToken* pValue);
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SToken* pIndexName,
SToken* pTableName, SNodeList* pCols, SNode* pOptions);
SNode* pRealTable, SNodeList* pCols, SNode* pOptions);
SNode* createIndexOption(SAstCreateContext* pCxt, SNodeList* pFuncs, SNode* pInterval, SNode* pOffset, SNode* pSliding,
SNode* pStreamOptions);
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pIndexName);
......
......@@ -33,6 +33,8 @@
} else {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INCOMPLETE_SQL);
}
} else if (TSDB_CODE_PAR_DB_NOT_SPECIFIED == pCxt->errCode && TK_NK_FLOAT == TOKEN.type) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, TOKEN.z);
}
}
......@@ -422,9 +424,7 @@ from_db_opt(A) ::= FROM db_name(B).
/************************************************ create index ********************************************************/
cmd ::= CREATE SMA INDEX not_exists_opt(D)
index_name(A) ON table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, D, &A, &B, NULL, C); }
//cmd ::= CREATE FULLTEXT INDEX not_exists_opt(D)
// index_name(A) ON table_name(B) NK_LP col_name_list(C) NK_RP. { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_FULLTEXT, D, &A, &B, C, NULL); }
index_name(A) ON full_table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, D, &A, B, NULL, C); }
cmd ::= DROP INDEX exists_opt(B) index_name(A). { pCxt->pRootNode = createDropIndexStmt(pCxt, B, &A); }
index_options(A) ::= FUNCTION NK_LP func_list(B) NK_RP INTERVAL
......
......@@ -1403,9 +1403,9 @@ SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const
}
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SToken* pIndexName,
SToken* pTableName, SNodeList* pCols, SNode* pOptions) {
SNode* pRealTable, SNodeList* pCols, SNode* pOptions) {
CHECK_PARSER_STATUS(pCxt);
if (!checkIndexName(pCxt, pIndexName) || !checkTableName(pCxt, pTableName) || !checkDbName(pCxt, NULL, true)) {
if (!checkIndexName(pCxt, pIndexName)) {
return NULL;
}
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)nodesMakeNode(QUERY_NODE_CREATE_INDEX_STMT);
......@@ -1413,7 +1413,9 @@ SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool igno
pStmt->indexType = type;
pStmt->ignoreExists = ignoreExists;
COPY_STRING_FORM_ID_TOKEN(pStmt->indexName, pIndexName);
COPY_STRING_FORM_ID_TOKEN(pStmt->tableName, pTableName);
strcpy(pStmt->dbName, ((SRealTableNode*)pRealTable)->table.dbName);
strcpy(pStmt->tableName, ((SRealTableNode*)pRealTable)->table.tableName);
nodesDestroyNode(pRealTable);
pStmt->pCols = pCols;
pStmt->pOptions = (SIndexOptions*)pOptions;
return (SNode*)pStmt;
......
......@@ -244,7 +244,10 @@ static int32_t collectMetaKeyFromDropTable(SCollectMetaKeyCxt* pCxt, SDropTableS
}
static int32_t collectMetaKeyFromAlterTable(SCollectMetaKeyCxt* pCxt, SAlterTableStmt* pStmt) {
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
int32_t code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
......@@ -252,7 +255,11 @@ static int32_t collectMetaKeyFromAlterTable(SCollectMetaKeyCxt* pCxt, SAlterTabl
}
static int32_t collectMetaKeyFromAlterStable(SCollectMetaKeyCxt* pCxt, SAlterTableStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
int32_t code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
return code;
}
static int32_t collectMetaKeyFromUseDatabase(SCollectMetaKeyCxt* pCxt, SUseDatabaseStmt* pStmt) {
......
......@@ -434,7 +434,7 @@ static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf,
}
static bool isNullStr(SToken* pToken) {
return ((pToken->type == TK_NK_STRING) && (pToken->n != 0) &&
return ((pToken->type == TK_NK_STRING) && (strlen(TSDB_DATA_NULL_STR_L) == pToken->n) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}
......
......@@ -886,8 +886,7 @@ static EDealRes translateNormalValue(STranslateContext* pCxt, SValueNode* pVal,
}
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: {
if (strict && (!IS_VAR_DATA_TYPE(pVal->node.resType.type) ||
pVal->node.resType.bytes > targetDt.bytes - VARSTR_HEADER_SIZE)) {
if (strict && (pVal->node.resType.bytes > targetDt.bytes - VARSTR_HEADER_SIZE)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
pVal->datum.p = taosMemoryCalloc(1, targetDt.bytes + 1);
......@@ -907,9 +906,6 @@ static EDealRes translateNormalValue(STranslateContext* pCxt, SValueNode* pVal,
break;
}
case TSDB_DATA_TYPE_NCHAR: {
if (strict && !IS_VAR_DATA_TYPE(pVal->node.resType.type)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
pVal->datum.p = taosMemoryCalloc(1, targetDt.bytes + 1);
if (NULL == pVal->datum.p) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
......@@ -4127,7 +4123,7 @@ static SSchema* getTagSchema(STableMeta* pTableMeta, const char* pTagName) {
return NULL;
}
static int32_t checkAlterSuperTableImpl(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta) {
static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta) {
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
if (getNumOfTags(pTableMeta) == 1 && pTagsSchema->type == TSDB_DATA_TYPE_JSON &&
(pStmt->alterType == TSDB_ALTER_TABLE_ADD_TAG || pStmt->alterType == TSDB_ALTER_TABLE_DROP_TAG ||
......@@ -4153,11 +4149,16 @@ static int32_t checkAlterSuperTableImpl(STranslateContext* pCxt, SAlterTableStmt
}
static int32_t checkAlterSuperTable(STranslateContext* pCxt, SAlterTableStmt* pStmt) {
if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == pStmt->alterType || TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME == pStmt->alterType) {
if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == pStmt->alterType) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE,
"Set tag value only available for child table");
}
if (TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME == pStmt->alterType) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE,
"Rename column only available for normal table");
}
if (pStmt->alterType == TSDB_ALTER_TABLE_UPDATE_OPTIONS && -1 != pStmt->pOptions->ttl) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE);
}
......@@ -4170,10 +4171,21 @@ static int32_t checkAlterSuperTable(STranslateContext* pCxt, SAlterTableStmt* pS
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COL_JSON);
}
SDbCfgInfo dbCfg = {0};
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
if (TSDB_CODE_SUCCESS == code && NULL != dbCfg.pRetensions &&
(TSDB_ALTER_TABLE_ADD_COLUMN == pStmt->alterType || TSDB_ALTER_TABLE_DROP_COLUMN == pStmt->alterType ||
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE,
"Modifying the table schema is not supported in databases "
"configured with the 'RETENTIONS' option");
}
STableMeta* pTableMeta = NULL;
int32_t code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
code = checkAlterSuperTableImpl(pCxt, pStmt, pTableMeta);
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkAlterSuperTableBySchema(pCxt, pStmt, pTableMeta);
}
taosMemoryFree(pTableMeta);
return code;
......@@ -4311,9 +4323,8 @@ static int32_t getSmaIndexAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt,
static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SMCreateSmaReq* pReq) {
SName name;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->indexName, &name), pReq->name);
strcpy(name.tname, pStmt->tableName);
name.tname[strlen(pStmt->tableName)] = '\0';
tNameExtractFullName(&name, pReq->stb);
memset(&name, 0, sizeof(SName));
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name), pReq->stb);
pReq->igExists = pStmt->ignoreExists;
pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
pReq->intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit;
......@@ -4391,7 +4402,7 @@ static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt
static int32_t buildCreateFullTextReq(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SMCreateFullTextReq* pReq) {
// impl later
return 0;
return TSDB_CODE_SUCCESS;
}
static int32_t translateCreateFullTextIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
......@@ -4405,12 +4416,10 @@ static int32_t translateCreateFullTextIndex(STranslateContext* pCxt, SCreateInde
}
static int32_t translateCreateIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
if (INDEX_TYPE_SMA == pStmt->indexType) {
return translateCreateSmaIndex(pCxt, pStmt);
} else if (INDEX_TYPE_FULLTEXT == pStmt->indexType) {
if (INDEX_TYPE_FULLTEXT == pStmt->indexType) {
return translateCreateFullTextIndex(pCxt, pStmt);
}
return TSDB_CODE_FAILED;
return translateCreateSmaIndex(pCxt, pStmt);
}
static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt) {
......
......@@ -653,7 +653,7 @@ static int32_t reserveTableReqInCacheImpl(const char* pTbFName, int32_t len, SHa
static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const char* pTable, SHashObj** pTables) {
char fullName[TSDB_TABLE_FNAME_LEN];
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s.%s", acctId, pDb, pTable);
int32_t len = snprintf(fullName, sizeof(fullName), "%d.`%s`.`%s`", acctId, pDb, pTable);
return reserveTableReqInCacheImpl(fullName, len, pTables);
}
......
此差异已折叠。
......@@ -779,8 +779,10 @@ static SNode* stbSplCreateColumnNode(SExprNode* pExpr) {
strcpy(pCol->dbName, ((SColumnNode*)pExpr)->dbName);
strcpy(pCol->tableName, ((SColumnNode*)pExpr)->tableName);
strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias);
strcpy(pCol->colName, ((SColumnNode*)pExpr)->colName);
} else {
strcpy(pCol->colName, pExpr->aliasName);
}
strcpy(pCol->colName, pExpr->aliasName);
strcpy(pCol->node.aliasName, pExpr->aliasName);
pCol->node.resType = pExpr->resType;
return (SNode*)pCol;
......
......@@ -64,6 +64,12 @@ TEST_F(PlanSubqeuryTest, innerFill) {
"WHERE ts > '2022-04-06 00:00:00'");
}
TEST_F(PlanSubqeuryTest, innerOrderBy) {
useDb("root", "test");
run("SELECT c2 FROM (SELECT c2 FROM st1 ORDER BY c1, _rowts)");
}
TEST_F(PlanSubqeuryTest, outerInterval) {
useDb("root", "test");
......
......@@ -381,8 +381,8 @@ void cliHandleResp(SCliConn* conn) {
STraceId* trace = &transMsg.info.traceId;
tGTrace("%s conn %p %s received from %s, local info:%s, msg size:%d, code:0x%x", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, transMsg.code);
tGTrace("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, tstrerror(transMsg.code));
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
......@@ -549,6 +549,8 @@ static void addConnToPool(void* pool, SCliConn* conn) {
CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
conn->list = taosHashGet((SHashObj*)pool, key, strlen(key));
} else {
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
}
assert(conn->list != NULL);
QUEUE_INIT(&conn->q);
......@@ -756,8 +758,8 @@ void cliSend(SCliConn* pConn) {
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
STraceId* trace = &pMsg->info.traceId;
tGTrace("%s conn %p %s is sent to %s, local info %s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType),
pConn->dst, pConn->src);
tGTrace("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen);
if (pHead->persist == 1) {
CONN_SET_PERSIST_BY_APP(pConn);
......
......@@ -253,11 +253,11 @@ static void uvHandleReq(SSvrConn* pConn) {
if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn);
tGTrace("%s conn %p %s received from %s, local info:%s, msg size:%d", transLabel(pTransInst), pConn,
tGTrace("%s conn %p %s received from %s, local info:%s, len:%d", transLabel(pTransInst), pConn,
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen);
} else {
tGTrace("%s conn %p %s received from %s, local info:%s, msg size:%d, resp:%d, code:%d", transLabel(pTransInst),
pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen, pHead->noResp, transMsg.code);
tGTrace("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d", transLabel(pTransInst), pConn,
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen, pHead->noResp, transMsg.code);
}
// pHead->noResp = 1,
......@@ -296,11 +296,9 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
if (nread > 0) {
pBuf->len += nread;
tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread);
if (transReadComplete(pBuf)) {
while (transReadComplete(pBuf)) {
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
uvHandleReq(conn);
} else {
tTrace("%s conn %p read partial packet, continue to read", transLabel(pTransInst), conn);
}
return;
}
......@@ -418,8 +416,8 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
STrans* pTransInst = pConn->pTransInst;
STraceId* trace = &pMsg->info.traceId;
tGTrace("%s conn %p %s is sent to %s, local info:%s, msglen:%d", transLabel(pTransInst), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len);
tGTrace("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pTransInst), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen);
pHead->msgLen = htonl(len);
wb->base = msg;
......
......@@ -116,15 +116,15 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
}
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pIdxTFile == NULL) {
if (pIdxFile == NULL) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
code = taosLSeekFile(pIdxTFile, idxOff, SEEK_SET);
code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
if (code < 0) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
......@@ -132,7 +132,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
}
// read idx file and get log file pos
SWalIdxEntry entry;
if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
......@@ -140,24 +140,24 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
ASSERT(entry.ver == ver);
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pLogTFile == NULL) {
ASSERT(0);
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pLogFile == NULL) {
// TODO
terrno = TAOS_SYSTEM_ERROR(errno);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
code = taosLSeekFile(pLogFile, entry.offset, SEEK_SET);
if (code < 0) {
ASSERT(0);
// TODO
terrno = TAOS_SYSTEM_ERROR(errno);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
// validate offset
SWalCkHead head;
ASSERT(taosValidFile(pLogTFile));
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead));
ASSERT(taosValidFile(pLogFile));
int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
if (size != sizeof(SWalCkHead)) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
......@@ -180,14 +180,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
}
// truncate old files
code = taosFtruncateFile(pLogTFile, entry.offset);
code = taosFtruncateFile(pLogFile, entry.offset);
if (code < 0) {
ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
code = taosFtruncateFile(pIdxTFile, idxOff);
code = taosFtruncateFile(pIdxFile, idxOff);
if (code < 0) {
ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -205,8 +205,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
ASSERT(((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize == 0);
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->firstVer = -1;
}
taosCloseFile(&pIdxTFile);
taosCloseFile(&pLogTFile);
taosCloseFile(&pIdxFile);
taosCloseFile(&pLogFile);
walSaveMeta(pWal);
// unlock
taosThreadMutexUnlock(&pWal->mutex);
......
......@@ -26,8 +26,8 @@ endi
print =============== step3-1 insert records into ct1
sql insert into ct1 values('2022-05-03 16:59:00.010', 10, 20, 'n','n',30);
sql insert into ct1 values('2022-05-03 16:59:00.011', 'N', 'n', 'N',"N",30);
sql insert into ct1 values('2022-05-03 16:59:00.012', 'Nu', 'nul', 'Nul','NUL',30);
sql insert into ct1 values('2022-05-03 16:59:00.011', 'null', 'null', 'N',"N",30);
sql insert into ct1 values('2022-05-03 16:59:00.012', 'null', 'null', 'Nul','NUL',30);
sql insert into ct1 values('2022-05-03 16:59:00.013', NULL, 'null', 'Null',null,30);
sql insert into ct1 values('2022-05-03 16:59:00.014', NULL, 'NuLL', 'Null',NULL,30);
......
......@@ -253,34 +253,34 @@ class TDTestCase:
tdLog.printNoPrefix("==========step2.1.1 : alter stb schemaL drop column")
tdSql.query(f"select {BINT_COL} from {DB3}.{STBNAME}")
tdSql.execute(f"alter stable {DB3}.stb1 drop column {BINT_COL}")
# TODO not support alter stable schema anymore
# tdSql.error(f"alter stable {DB3}.stb1 drop column {BINT_COL}")
tdSql.error(f"select {BINT_COL} from {DB3}.{STBNAME}")
#tdSql.execute(f"alter stable {DB3}.stb1 drop column {BINT_COL}")
# not support alter stable schema anymore
tdSql.error(f"alter stable {DB3}.stb1 drop column {BINT_COL}")
#tdSql.error(f"select {BINT_COL} from {DB3}.{STBNAME}")
tdLog.printNoPrefix("==========step2.1.2 : alter stb schemaL add num_column")
# TODO not support alter stable schema anymore
# tdSql.error(f"alter stable {DB3}.stb1 add column {INT_COL}_1 int")
# not support alter stable schema anymore
tdSql.error(f"alter stable {DB3}.stb1 add column {INT_COL}_1 int")
tdSql.error(f"select {INT_COL}_1 from {DB3}.{STBNAME}")
tdSql.execute(f"alter stable {DB3}.stb1 add column {INT_COL}_1 int")
tdSql.query(f"select count({INT_COL}_1) from {DB3}.{STBNAME} where _c0 > now-5m")
tdSql.checkData(0, 0, 0)
tdSql.execute(f"insert into {DB3}.{CTBNAME} ({PRIMARY_COL}, {INT_COL}, {INT_COL}_1) values({NOW}+20s, 111, 112)")
time.sleep(7)
tdSql.query(f"select _rowts, {INT_COL}, {INT_COL}_1 from {DB3}.{CTBNAME} where _c0 > now()-1h and _c0>{NOW}")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 111)
tdSql.checkData(0, 2, 112)
tdSql.query(f"select _rowts, {INT_COL}, {INT_COL}_1 from {DB3}.{CTBNAME} where _c0 > now()-2d and _c0>{NOW}")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 111)
tdSql.checkData(0, 2, 112)
tdSql.query(f"select _rowts, {INT_COL}, {INT_COL}_1 from {DB3}.{CTBNAME} where _c0 > now()-7d and _c0>{NOW}")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 111)
tdSql.checkData(0, 2, 112)
#tdSql.execute(f"alter stable {DB3}.stb1 add column {INT_COL}_1 int")
#tdSql.query(f"select count({INT_COL}_1) from {DB3}.{STBNAME} where _c0 > now-5m")
#tdSql.checkData(0, 0, 0)
#tdSql.execute(f"insert into {DB3}.{CTBNAME} ({PRIMARY_COL}, {INT_COL}, {INT_COL}_1) values({NOW}+20s, 111, 112)")
#time.sleep(7)
#tdSql.query(f"select _rowts, {INT_COL}, {INT_COL}_1 from {DB3}.{CTBNAME} where _c0 > now()-1h and _c0>{NOW}")
#tdSql.checkRows(1)
#tdSql.checkData(0, 1, 111)
#tdSql.checkData(0, 2, 112)
#
#tdSql.query(f"select _rowts, {INT_COL}, {INT_COL}_1 from {DB3}.{CTBNAME} where _c0 > now()-2d and _c0>{NOW}")
#tdSql.checkRows(1)
#tdSql.checkData(0, 1, 111)
#tdSql.checkData(0, 2, 112)
#tdSql.query(f"select _rowts, {INT_COL}, {INT_COL}_1 from {DB3}.{CTBNAME} where _c0 > now()-7d and _c0>{NOW}")
#tdSql.checkRows(1)
#tdSql.checkData(0, 1, 111)
#tdSql.checkData(0, 2, 112)
tdLog.printNoPrefix("==========step2.1.3 : drop child-table")
tdSql.execute(f"drop table {DB3}.{CTBNAME} ")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册