未验证 提交 6ea101d5 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3950 from taosdata/feature/query

Feature/query
...@@ -149,14 +149,13 @@ int tscAllocPayload(SSqlCmd* pCmd, int size); ...@@ -149,14 +149,13 @@ int tscAllocPayload(SSqlCmd* pCmd, int size);
TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes); TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes);
SFieldSupInfo* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField); SInternalField* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField);
SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* field); SInternalField* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* field);
SFieldSupInfo* tscFieldInfoGetSupp(SFieldInfo* pFieldInfo, int32_t index); SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t index);
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index); TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo); void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src);
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo); void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index); int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
...@@ -231,10 +230,11 @@ int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo); ...@@ -231,10 +230,11 @@ int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetMeterMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists); int tscGetMeterMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists);
void tscResetForNextRetrieve(SSqlRes* pRes); void tscResetForNextRetrieve(SSqlRes* pRes);
void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t tableIndex);
void tscDoQuery(SSqlObj* pSql); void tscDoQuery(SSqlObj* pSql);
SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo);
void* tscVgroupInfoClear(SVgroupsInfo *pInfo);
void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src);
/** /**
* The create object function must be successful expect for the out of memory issue. * The create object function must be successful expect for the out of memory issue.
* *
......
...@@ -90,10 +90,10 @@ typedef struct STableComInfo { ...@@ -90,10 +90,10 @@ typedef struct STableComInfo {
} STableComInfo; } STableComInfo;
typedef struct SCMCorVgroupInfo { typedef struct SCMCorVgroupInfo {
int32_t version; int32_t version;
int8_t inUse; int8_t inUse;
int8_t numOfEps; int8_t numOfEps;
SEpAddr epAddr[TSDB_MAX_REPLICA]; SEpAddr1 epAddr[TSDB_MAX_REPLICA];
} SCMCorVgroupInfo; } SCMCorVgroupInfo;
typedef struct STableMeta { typedef struct STableMeta {
...@@ -142,16 +142,17 @@ typedef struct SColumnIndex { ...@@ -142,16 +142,17 @@ typedef struct SColumnIndex {
int16_t columnIndex; int16_t columnIndex;
} SColumnIndex; } SColumnIndex;
typedef struct SFieldSupInfo { typedef struct SInternalField {
TAOS_FIELD field;
bool visible; bool visible;
SExprInfo *pArithExprInfo; SExprInfo *pArithExprInfo;
SSqlExpr *pSqlExpr; SSqlExpr *pSqlExpr;
} SFieldSupInfo; } SInternalField;
typedef struct SFieldInfo { typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result int16_t numOfOutput; // number of column in result
SArray *pFields; // SArray<TAOS_FIELD> TAOS_FIELD* final;
SArray *pSupportInfo; // SArray<SFieldSupInfo> SArray *internalField; // SArray<SInternalField>
} SFieldInfo; } SFieldInfo;
typedef struct SColumn { typedef struct SColumn {
...@@ -443,6 +444,7 @@ void tscPartiallyFreeSqlObj(SSqlObj *pSql); ...@@ -443,6 +444,7 @@ void tscPartiallyFreeSqlObj(SSqlObj *pSql);
*/ */
void tscFreeSqlObj(SSqlObj *pSql); void tscFreeSqlObj(SSqlObj *pSql);
void tscFreeRegisteredSqlObj(void *pSql); void tscFreeRegisteredSqlObj(void *pSql);
void tscFreeTableMetaHelper(void *pTableMeta);
void tscCloseTscObj(STscObj *pObj); void tscCloseTscObj(STscObj *pObj);
...@@ -467,7 +469,7 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s ...@@ -467,7 +469,7 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) { static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
SFieldSupInfo* pInfo = (SFieldSupInfo*) TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex); SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pFieldInfo->internalField, columnIndex);
assert(pInfo->pSqlExpr != NULL); assert(pInfo->pSqlExpr != NULL);
int32_t type = pInfo->pSqlExpr->resType; int32_t type = pInfo->pSqlExpr->resType;
......
...@@ -327,7 +327,7 @@ void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) { ...@@ -327,7 +327,7 @@ void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
} }
for (int i = 0; i < pCmd->numOfCols; ++i){ for (int i = 0; i < pCmd->numOfCols; ++i){
SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i); SInternalField* pSup = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
if (pSup->pSqlExpr != NULL) { if (pSup->pSqlExpr != NULL) {
// pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row; // pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row;
} else { } else {
...@@ -348,7 +348,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { ...@@ -348,7 +348,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for (int i = 0; i < pCmd->numOfCols; ++i) { for (int i = 0; i < pCmd->numOfCols; ++i) {
SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i); SInternalField* pSup = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
if (pSup->pSqlExpr != NULL) { if (pSup->pSqlExpr != NULL) {
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i); tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
...@@ -405,11 +405,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -405,11 +405,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
pRes->code = code; pRes->code = code;
const char* msg = (pCmd->command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta";
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscError("%p get tableMeta failed, code:%s", pSql, tstrerror(code)); tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code));
goto _error; goto _error;
} else { } else {
const char* msg = (pCmd->command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta";
tscDebug("%p get %s successfully", pSql, msg); tscDebug("%p get %s successfully", pSql, msg);
} }
......
...@@ -239,7 +239,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, ...@@ -239,7 +239,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE}; TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE};
tstrncpy(f.name, "Field", sizeof(f.name)); tstrncpy(f.name, "Field", sizeof(f.name));
SFieldSupInfo* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, (TSDB_COL_NAME_LEN - 1), false); (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, (TSDB_COL_NAME_LEN - 1), false);
...@@ -485,7 +485,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const ...@@ -485,7 +485,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
tstrncpy(f.name, "Database", sizeof(f.name)); tstrncpy(f.name, "Database", sizeof(f.name));
} }
SFieldSupInfo* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
f.bytes, f.bytes - VARSTR_HEADER_SIZE, false); f.bytes, f.bytes - VARSTR_HEADER_SIZE, false);
...@@ -922,19 +922,17 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa ...@@ -922,19 +922,17 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
pQueryInfo->order.order = TSDB_ORDER_ASC; pQueryInfo->order.order = TSDB_ORDER_ASC;
tscFieldInfoClear(&pQueryInfo->fieldsInfo); tscFieldInfoClear(&pQueryInfo->fieldsInfo);
pQueryInfo->fieldsInfo.pFields = taosArrayInit(1, sizeof(TAOS_FIELD)); pQueryInfo->fieldsInfo.internalField = taosArrayInit(1, sizeof(SInternalField));
pQueryInfo->fieldsInfo.pSupportInfo = taosArrayInit(1, sizeof(SFieldSupInfo));
TAOS_FIELD f = tscCreateField((int8_t)type, columnName, (int16_t)valueLength); TAOS_FIELD f = tscCreateField((int8_t)type, columnName, (int16_t)valueLength);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
tscInitResObjForLocalQuery(pSql, 1, (int32_t)valueLength); tscInitResObjForLocalQuery(pSql, 1, (int32_t)valueLength);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0); SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, 0);
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, 0);
pInfo->pSqlExpr = taosArrayGetP(pQueryInfo->exprList, 0); pInfo->pSqlExpr = taosArrayGetP(pQueryInfo->exprList, 0);
memcpy(pRes->data, val, pField->bytes); memcpy(pRes->data, val, pInfo->field.bytes);
} }
int tscProcessLocalCmd(SSqlObj *pSql) { int tscProcessLocalCmd(SSqlObj *pSql) {
......
...@@ -510,7 +510,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { ...@@ -510,7 +510,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
taosTFree(pLocalReducer->pResultBuf); taosTFree(pLocalReducer->pResultBuf);
if (pLocalReducer->pResInfo != NULL) { if (pLocalReducer->pResInfo != NULL) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { size_t num = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < num; ++i) {
taosTFree(pLocalReducer->pResInfo[i].interResultBuf); taosTFree(pLocalReducer->pResInfo[i].interResultBuf);
} }
......
...@@ -122,7 +122,7 @@ static int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSql ...@@ -122,7 +122,7 @@ static int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSql
static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index); static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index);
static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pSqlExpr, SArray* pExprInfo, SQueryInfo* pQueryInfo, SArray* pCols); static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pSqlExpr, SQueryInfo* pQueryInfo, SArray* pCols, int64_t *uid);
/* /*
* Used during parsing query sql. Since the query sql usually small in length, error position * Used during parsing query sql. Since the query sql usually small in length, error position
...@@ -190,7 +190,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -190,7 +190,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (!pInfo->valid) { if (!pInfo->valid || terrno == TSDB_CODE_TSC_SQL_SYNTAX_ERROR) {
terrno = TSDB_CODE_SUCCESS; // clear the error number
return tscSQLSyntaxErrMsg(tscGetErrorMsgPayload(pCmd), NULL, pInfo->pzErrMsg); return tscSQLSyntaxErrMsg(tscGetErrorMsgPayload(pCmd), NULL, pInfo->pzErrMsg);
} }
...@@ -1249,7 +1250,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t ...@@ -1249,7 +1250,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
tExprNode* pNode = NULL; tExprNode* pNode = NULL;
SArray* colList = taosArrayInit(10, sizeof(SColIndex)); SArray* colList = taosArrayInit(10, sizeof(SColIndex));
int32_t ret = exprTreeFromSqlExpr(pCmd, &pNode, pItem->pNode, pQueryInfo->exprList, pQueryInfo, colList); int32_t ret = exprTreeFromSqlExpr(pCmd, &pNode, pItem->pNode, pQueryInfo, colList, NULL);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
taosArrayDestroy(colList); taosArrayDestroy(colList);
tExprTreeDestroy(&pNode, NULL); tExprTreeDestroy(&pNode, NULL);
...@@ -1305,17 +1306,17 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t ...@@ -1305,17 +1306,17 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
insertResultField(pQueryInfo, exprIndex, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, aliasName, NULL); insertResultField(pQueryInfo, exprIndex, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, aliasName, NULL);
int32_t slot = tscNumOfFields(pQueryInfo) - 1; int32_t slot = tscNumOfFields(pQueryInfo) - 1;
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, slot); SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, slot);
if (pInfo->pSqlExpr == NULL) { if (pInfo->pSqlExpr == NULL) {
SExprInfo* pArithExprInfo = calloc(1, sizeof(SExprInfo)); SExprInfo* pArithExprInfo = calloc(1, sizeof(SExprInfo));
// arithmetic expression always return result in the format of double float // arithmetic expression always return result in the format of double float
pArithExprInfo->bytes = sizeof(double); pArithExprInfo->bytes = sizeof(double);
pArithExprInfo->interBytes = sizeof(double); pArithExprInfo->interBytes = sizeof(double);
pArithExprInfo->type = TSDB_DATA_TYPE_DOUBLE; pArithExprInfo->type = TSDB_DATA_TYPE_DOUBLE;
int32_t ret = exprTreeFromSqlExpr(pCmd, &pArithExprInfo->pExpr, pItem->pNode, pQueryInfo->exprList, pQueryInfo, NULL); int32_t ret = exprTreeFromSqlExpr(pCmd, &pArithExprInfo->pExpr, pItem->pNode, pQueryInfo, NULL, &pArithExprInfo->uid);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
tExprTreeDestroy(&pArithExprInfo->pExpr, NULL); tExprTreeDestroy(&pArithExprInfo->pExpr, NULL);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "invalid expression in select clause"); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "invalid expression in select clause");
...@@ -1372,7 +1373,7 @@ static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) { ...@@ -1372,7 +1373,7 @@ static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) {
int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
tscAddSpecialColumnForSelect(pQueryInfo, numOfCols, TSDB_FUNC_PRJ, &index, pSchema, TSDB_COL_NORMAL); tscAddSpecialColumnForSelect(pQueryInfo, numOfCols, TSDB_FUNC_PRJ, &index, pSchema, TSDB_COL_NORMAL);
SFieldSupInfo* pSupInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, numOfCols); SInternalField* pSupInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, numOfCols);
pSupInfo->visible = false; pSupInfo->visible = false;
pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY; pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY;
...@@ -1469,7 +1470,7 @@ int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnLi ...@@ -1469,7 +1470,7 @@ int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnLi
} }
TAOS_FIELD f = tscCreateField(type, fieldName, bytes); TAOS_FIELD f = tscCreateField(type, fieldName, bytes);
SFieldSupInfo* pInfo = tscFieldInfoInsert(&pQueryInfo->fieldsInfo, outputIndex, &f); SInternalField* pInfo = tscFieldInfoInsert(&pQueryInfo->fieldsInfo, outputIndex, &f);
pInfo->pSqlExpr = pSqlExpr; pInfo->pSqlExpr = pSqlExpr;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -3384,10 +3385,26 @@ static int32_t validateSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryInfo* pQuer ...@@ -3384,10 +3385,26 @@ static int32_t validateSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryInfo* pQuer
tSQLExprItem item = {.pNode = pExpr, .aliasName = NULL}; tSQLExprItem item = {.pNode = pExpr, .aliasName = NULL};
// sql function in selection clause, append sql function info in pSqlCmd structure sequentially // sql function list in selection clause.
// Append the sqlExpr into exprList of pQueryInfo structure sequentially
if (addExprAndResultField(pCmd, pQueryInfo, outputIndex, &item, false) != TSDB_CODE_SUCCESS) { if (addExprAndResultField(pCmd, pQueryInfo, outputIndex, &item, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
// It is invalid in case of more than one sqlExpr, such as first(ts, k) - last(ts, k)
int32_t inc = (int32_t) tscSqlExprNumOfExprs(pQueryInfo) - outputIndex;
if (inc > 1) {
return TSDB_CODE_TSC_INVALID_SQL;
}
// Not supported data type in arithmetic expression
for(int32_t i = 0; i < inc; ++i) {
SSqlExpr* p1 = tscSqlExprGet(pQueryInfo, i + outputIndex);
int16_t t = p1->resType;
if (t == TSDB_DATA_TYPE_BINARY || t == TSDB_DATA_TYPE_NCHAR || t == TSDB_DATA_TYPE_BOOL || t == TSDB_DATA_TYPE_TIMESTAMP) {
return TSDB_CODE_TSC_INVALID_SQL;
}
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -4062,7 +4079,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE ...@@ -4062,7 +4079,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE
tExprNode* p = NULL; tExprNode* p = NULL;
SArray* colList = taosArrayInit(10, sizeof(SColIndex)); SArray* colList = taosArrayInit(10, sizeof(SColIndex));
ret = exprTreeFromSqlExpr(pCmd, &p, p1, NULL, pQueryInfo, colList); ret = exprTreeFromSqlExpr(pCmd, &p, p1, pQueryInfo, colList, NULL);
SBufferWriter bw = tbufInitWriter(NULL, false); SBufferWriter bw = tbufInitWriter(NULL, false);
TRY(0) { TRY(0) {
...@@ -4733,7 +4750,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -4733,7 +4750,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// validate the length of binary // validate the length of binary
if ((pTagsSchema->type == TSDB_DATA_TYPE_BINARY || pTagsSchema->type == TSDB_DATA_TYPE_NCHAR) && if ((pTagsSchema->type == TSDB_DATA_TYPE_BINARY || pTagsSchema->type == TSDB_DATA_TYPE_NCHAR) &&
(pVarList->a[1].pVar.nLen + VARSTR_HEADER_SIZE) > pTagsSchema->bytes) { varDataTLen(pAlterSQL->tagData.data) > pTagsSchema->bytes) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg14); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg14);
} }
...@@ -5259,26 +5276,6 @@ int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) { ...@@ -5259,26 +5276,6 @@ int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
//void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t tableIndex) {
// // the first column not timestamp column, add it
// SSqlExpr* pExpr = NULL;
// if (tscSqlExprNumOfExprs(pQueryInfo) > 0) {
// pExpr = tscSqlExprGet(pQueryInfo, 0);
// }
//
// if (pExpr == NULL || pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX || pExpr->functionId != functionId) {
// SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
//
// pExpr = tscSqlExprInsert(pQueryInfo, 0, functionId, &index, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE, false);
// pExpr->colInfo.flag = TSDB_COL_NORMAL;
//
// // NOTE: tag column does not add to source column list
// SColumnList ids = getColumnList(1, tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX);
//
// insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, "ts", pExpr);
// }
//}
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex) { void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex) {
SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentObj->cmd, subClauseIndex); SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentObj->cmd, subClauseIndex);
...@@ -5335,7 +5332,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) { ...@@ -5335,7 +5332,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) {
tscAddSpecialColumnForSelect(pQueryInfo, (int32_t)size, TSDB_FUNC_PRJ, &colIndex, pSchema, TSDB_COL_NORMAL); tscAddSpecialColumnForSelect(pQueryInfo, (int32_t)size, TSDB_FUNC_PRJ, &colIndex, pSchema, TSDB_COL_NORMAL);
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, (int32_t)size); SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, (int32_t)size);
doLimitOutputNormalColOfGroupby(pInfo->pSqlExpr); doLimitOutputNormalColOfGroupby(pInfo->pSqlExpr);
pInfo->visible = false; pInfo->visible = false;
} }
...@@ -6380,19 +6377,19 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { ...@@ -6380,19 +6377,19 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return TSDB_CODE_SUCCESS; // Does not build query message here return TSDB_CODE_SUCCESS; // Does not build query message here
} }
int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pSqlExpr, SArray* pExprInfo, SQueryInfo* pQueryInfo, SArray* pCols) { int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pSqlExpr, SQueryInfo* pQueryInfo, SArray* pCols, int64_t *uid) {
tExprNode* pLeft = NULL; tExprNode* pLeft = NULL;
tExprNode* pRight= NULL; tExprNode* pRight= NULL;
if (pSqlExpr->pLeft != NULL) { if (pSqlExpr->pLeft != NULL) {
int32_t ret = exprTreeFromSqlExpr(pCmd, &pLeft, pSqlExpr->pLeft, pExprInfo, pQueryInfo, pCols); int32_t ret = exprTreeFromSqlExpr(pCmd, &pLeft, pSqlExpr->pLeft, pQueryInfo, pCols, uid);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
} }
if (pSqlExpr->pRight != NULL) { if (pSqlExpr->pRight != NULL) {
int32_t ret = exprTreeFromSqlExpr(pCmd, &pRight, pSqlExpr->pRight, pExprInfo, pQueryInfo, pCols); int32_t ret = exprTreeFromSqlExpr(pCmd, &pRight, pSqlExpr->pRight, pQueryInfo, pCols, uid);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
...@@ -6419,14 +6416,19 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS ...@@ -6419,14 +6416,19 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS
strncpy((*pExpr)->pSchema->name, pSqlExpr->operand.z, pSqlExpr->operand.n); strncpy((*pExpr)->pSchema->name, pSqlExpr->operand.z, pSqlExpr->operand.n);
// set the input column data byte and type. // set the input column data byte and type.
size_t size = taosArrayGetSize(pExprInfo); size_t size = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SSqlExpr* p1 = taosArrayGetP(pExprInfo, i); SSqlExpr* p1 = taosArrayGetP(pQueryInfo->exprList, i);
if (strcmp((*pExpr)->pSchema->name, p1->aliasName) == 0) { if (strcmp((*pExpr)->pSchema->name, p1->aliasName) == 0) {
(*pExpr)->pSchema->type = (uint8_t)p1->resType; (*pExpr)->pSchema->type = (uint8_t)p1->resType;
(*pExpr)->pSchema->bytes = p1->resBytes; (*pExpr)->pSchema->bytes = p1->resBytes;
if (uid != NULL) {
*uid = p1->uid;
}
break; break;
} }
} }
......
...@@ -145,10 +145,11 @@ static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo ...@@ -145,10 +145,11 @@ static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo
corVgroupInfo->inUse = 0; corVgroupInfo->inUse = 0;
corVgroupInfo->numOfEps = vgroupInfo->numOfEps; corVgroupInfo->numOfEps = vgroupInfo->numOfEps;
for (int32_t i = 0; i < corVgroupInfo->numOfEps; i++) { for (int32_t i = 0; i < corVgroupInfo->numOfEps; i++) {
strncpy(corVgroupInfo->epAddr[i].fqdn, vgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN); corVgroupInfo->epAddr[i].fqdn = strdup(vgroupInfo->epAddr[i].fqdn);
corVgroupInfo->epAddr[i].port = vgroupInfo->epAddr[i].port; corVgroupInfo->epAddr[i].port = vgroupInfo->epAddr[i].port;
} }
} }
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) { STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) {
assert(pTableMetaMsg != NULL); assert(pTableMetaMsg != NULL);
...@@ -162,9 +163,19 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size ...@@ -162,9 +163,19 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
.numOfColumns = pTableMetaMsg->numOfColumns, .numOfColumns = pTableMetaMsg->numOfColumns,
}; };
pTableMeta->id.tid = pTableMetaMsg->sid; pTableMeta->id.tid = pTableMetaMsg->tid;
pTableMeta->id.uid = pTableMetaMsg->uid; pTableMeta->id.uid = pTableMetaMsg->uid;
pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
SCMVgroupInfo* pVgroupInfo = &pTableMeta->vgroupInfo;
pVgroupInfo->numOfEps = pTableMetaMsg->vgroup.numOfEps;
pVgroupInfo->vgId = pTableMetaMsg->vgroup.vgId;
for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
SEpAddrMsg* pEpMsg = &pTableMetaMsg->vgroup.epAddr[i];
pVgroupInfo->epAddr[i].fqdn = strndup(pEpMsg->fqdn, tListLen(pEpMsg->fqdn));
pVgroupInfo->epAddr[i].port = pEpMsg->port;
}
tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo); tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo);
......
...@@ -124,9 +124,11 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { ...@@ -124,9 +124,11 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
pVgroupInfo->inUse = pEpSet->inUse; pVgroupInfo->inUse = pEpSet->inUse;
pVgroupInfo->numOfEps = pEpSet->numOfEps; pVgroupInfo->numOfEps = pEpSet->numOfEps;
for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) { for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
tstrncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], TSDB_FQDN_LEN); taosTFree(pVgroupInfo->epAddr[i].fqdn);
pVgroupInfo->epAddr[i].fqdn = strndup(pEpSet->fqdn[i], tListLen(pEpSet->fqdn[i]));
pVgroupInfo->epAddr[i].port = pEpSet->port[i]; pVgroupInfo->epAddr[i].port = pEpSet->port[i];
} }
tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse); tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
taosCorEndWrite(&pVgroupInfo->version); taosCorEndWrite(&pVgroupInfo->version);
} }
...@@ -1648,7 +1650,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1648,7 +1650,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tscProcessTableMetaRsp(SSqlObj *pSql) { int tscProcessTableMetaRsp(SSqlObj *pSql) {
STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp; STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
pMetaMsg->sid = htonl(pMetaMsg->sid); pMetaMsg->tid = htonl(pMetaMsg->tid);
pMetaMsg->sversion = htons(pMetaMsg->sversion); pMetaMsg->sversion = htons(pMetaMsg->sversion);
pMetaMsg->tversion = htons(pMetaMsg->tversion); pMetaMsg->tversion = htons(pMetaMsg->tversion);
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId); pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
...@@ -1658,9 +1660,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1658,9 +1660,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns); pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) && if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
(pMetaMsg->sid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) { (pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) {
tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId, tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId,
pMetaMsg->sid, pMetaMsg->tableId); pMetaMsg->tid, pMetaMsg->tableId);
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
...@@ -1839,22 +1841,30 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { ...@@ -1839,22 +1841,30 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
SSqlCmd* pCmd = &parent->cmd; SSqlCmd* pCmd = &parent->cmd;
for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) { for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i); STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
SVgroupsInfo * pVgroupInfo = (SVgroupsInfo *)pMsg;
pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups);
size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo); SVgroupsMsg * pVgroupMsg = (SVgroupsMsg *) pMsg;
pInfo->vgroupList = calloc(1, size); pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);
size_t size = sizeof(SCMVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg);
size_t vgroupsz = sizeof(SCMVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
pInfo->vgroupList = calloc(1, vgroupsz);
assert(pInfo->vgroupList != NULL); assert(pInfo->vgroupList != NULL);
memcpy(pInfo->vgroupList, pVgroupInfo, size); pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups;
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) { for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
//just init, no need to lock //just init, no need to lock
SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
pVgroups->vgId = htonl(pVgroups->vgId);
assert(pVgroups->numOfEps >= 1); SCMVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
pVgroups->vgId = htonl(vmsg->vgId);
pVgroups->numOfEps = vmsg->numOfEps;
assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
for (int32_t k = 0; k < pVgroups->numOfEps; ++k) { for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port); pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
} }
} }
...@@ -1890,7 +1900,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { ...@@ -1890,7 +1900,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns); pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
pSchema = pMetaMsg->schema; pSchema = pMetaMsg->schema;
pMetaMsg->sid = ntohs(pMetaMsg->sid); pMetaMsg->tid = ntohs(pMetaMsg->tid);
for (int i = 0; i < pMetaMsg->numOfColumns; ++i) { for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htons(pSchema->bytes);
pSchema++; pSchema++;
...@@ -1924,7 +1934,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { ...@@ -1924,7 +1934,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
tscColumnListInsert(pQueryInfo->colList, &index); tscColumnListInsert(pQueryInfo->colList, &index);
TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes); TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f); SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false); pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
......
...@@ -373,7 +373,7 @@ int taos_num_fields(TAOS_RES *res) { ...@@ -373,7 +373,7 @@ int taos_num_fields(TAOS_RES *res) {
size_t numOfCols = tscNumOfFields(pQueryInfo); size_t numOfCols = tscNumOfFields(pQueryInfo);
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SFieldSupInfo* pInfo = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i); SInternalField* pInfo = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
if (pInfo->visible) { if (pInfo->visible) {
num++; num++;
} }
...@@ -409,8 +409,24 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { ...@@ -409,8 +409,24 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
if (numOfCols == 0) { if (numOfCols == 0) {
return NULL; return NULL;
} }
return pQueryInfo->fieldsInfo.pFields->pData; SFieldInfo *pFieldInfo = &pQueryInfo->fieldsInfo;
if (pFieldInfo->final == NULL) {
TAOS_FIELD* f = calloc(pFieldInfo->numOfOutput, sizeof(TAOS_FIELD));
int32_t j = 0;
for(int32_t i = 0; i < pFieldInfo->numOfOutput; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(pFieldInfo, i);
if (pField->visible) {
f[j++] = pField->field;
}
}
pFieldInfo->final = f;
}
return pFieldInfo->final;
} }
int taos_retrieve(TAOS_RES *res) { int taos_retrieve(TAOS_RES *res) {
......
...@@ -168,8 +168,8 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf ...@@ -168,8 +168,8 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), true); taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), true);
taosTFree(pTableMetaInfo->vgroupList); pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay); tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
return; return;
} }
...@@ -275,7 +275,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf ...@@ -275,7 +275,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
taosTFree(pSql->pSubs); taosTFree(pSql->pSubs);
pSql->subState.numOfSub = 0; pSql->subState.numOfSub = 0;
taosTFree(pTableMetaInfo->vgroupList); pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscSetNextLaunchTimer(pStream, pSql); tscSetNextLaunchTimer(pStream, pSql);
} }
} }
......
...@@ -449,7 +449,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr ...@@ -449,7 +449,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
SVgroupTableInfo info = {{0}}; SVgroupTableInfo info = {{0}};
for (int32_t m = 0; m < pvg->numOfVgroups; ++m) { for (int32_t m = 0; m < pvg->numOfVgroups; ++m) {
if (tt->vgId == pvg->vgroups[m].vgId) { if (tt->vgId == pvg->vgroups[m].vgId) {
info.vgInfo = pvg->vgroups[m]; tscSCMVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]);
break; break;
} }
} }
...@@ -1142,7 +1142,6 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); ...@@ -1142,7 +1142,6 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
// TODO
int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
SSqlCmd * pCmd = &pSql->cmd; SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
...@@ -1298,14 +1297,6 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -1298,14 +1297,6 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0); assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
// todo add test
// SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
// if (pState == NULL) {
// code = TSDB_CODE_TSC_OUT_OF_MEMORY;
// goto _error;
// }
pSql->subState.numOfSub = pQueryInfo->numOfTables; pSql->subState.numOfSub = pQueryInfo->numOfTables;
bool hasEmptySub = false; bool hasEmptySub = false;
...@@ -1645,9 +1636,9 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -1645,9 +1636,9 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
// data in from current vnode is stored in cache and disk // data in from current vnode is stored in cache and disk
uint32_t numOfRowsFromSubquery = (uint32_t)(trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num); uint32_t numOfRowsFromSubquery = (uint32_t)(trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num);
tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql, SVgroupsInfo* vgroupsInfo = pTableMetaInfo->vgroupList;
pTableMetaInfo->vgroupList->vgroups[0].epAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql,
numOfRowsFromSubquery, idx); vgroupsInfo->vgroups[0].epAddr[0].fqdn, vgroupsInfo->vgroups[0].vgId, numOfRowsFromSubquery, idx);
tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
...@@ -2022,7 +2013,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -2022,7 +2013,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) { static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SFieldSupInfo* pInfo = (SFieldSupInfo*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.pSupportInfo, columnIndex); SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, columnIndex);
assert(pInfo->pSqlExpr != NULL); assert(pInfo->pSqlExpr != NULL);
*bytes = pInfo->pSqlExpr->resBytes; *bytes = pInfo->pSqlExpr->resBytes;
...@@ -2172,7 +2163,7 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) { ...@@ -2172,7 +2163,7 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) {
size_t size = tscNumOfFields(pQueryInfo); size_t size = tscNumOfFields(pQueryInfo);
for (int i = 0; i < size; ++i) { for (int i = 0; i < size; ++i) {
SFieldSupInfo* pSup = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.pSupportInfo, i); SInternalField* pSup = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
if (pSup->pSqlExpr != NULL) { if (pSup->pSqlExpr != NULL) {
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i); tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
} }
...@@ -2182,7 +2173,7 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) { ...@@ -2182,7 +2173,7 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) {
continue; continue;
} }
TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.pFields, i); TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) { if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
transferNcharData(pSql, i, pField); transferNcharData(pSql, i, pField);
} }
......
...@@ -77,6 +77,7 @@ int32_t tscInitRpc(const char *user, const char *secretEncrypt, void **pDnodeCon ...@@ -77,6 +77,7 @@ int32_t tscInitRpc(const char *user, const char *secretEncrypt, void **pDnodeCon
return 0; return 0;
} }
void taos_init_imp(void) { void taos_init_imp(void) {
char temp[128]; char temp[128];
...@@ -124,8 +125,9 @@ void taos_init_imp(void) { ...@@ -124,8 +125,9 @@ void taos_init_imp(void) {
double factor = (tscEmbedded == 0)? 2.0:4.0; double factor = (tscEmbedded == 0)? 2.0:4.0;
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor); tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor);
if (tscNumOfThreads < 2) {
if (tscNumOfThreads < 2) tscNumOfThreads = 2; tscNumOfThreads = 2;
}
tscQhandle = taosInitScheduler(queueSize, tscNumOfThreads, "tsc"); tscQhandle = taosInitScheduler(queueSize, tscNumOfThreads, "tsc");
if (NULL == tscQhandle) { if (NULL == tscQhandle) {
...@@ -140,7 +142,7 @@ void taos_init_imp(void) { ...@@ -140,7 +142,7 @@ void taos_init_imp(void) {
int64_t refreshTime = 10; // 10 seconds by default int64_t refreshTime = 10; // 10 seconds by default
if (tscMetaCache == NULL) { if (tscMetaCache == NULL) {
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta"); tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta");
tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj"); tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj");
} }
......
...@@ -408,6 +408,24 @@ void tscFreeRegisteredSqlObj(void *pSql) { ...@@ -408,6 +408,24 @@ void tscFreeRegisteredSqlObj(void *pSql) {
} }
} }
void tscFreeTableMetaHelper(void *pTableMeta) {
STableMeta* p = (STableMeta*) pTableMeta;
int32_t numOfEps = p->vgroupInfo.numOfEps;
assert(numOfEps >= 0 && numOfEps <= TSDB_MAX_REPLICA);
for(int32_t i = 0; i < numOfEps; ++i) {
taosTFree(p->vgroupInfo.epAddr[i].fqdn);
}
int32_t numOfEps1 = p->corVgroupInfo.numOfEps;
assert(numOfEps1 >= 0 && numOfEps1 <= TSDB_MAX_REPLICA);
for(int32_t i = 0; i < numOfEps1; ++i) {
taosTFree(p->corVgroupInfo.epAddr[i].fqdn);
}
}
void tscFreeSqlObj(SSqlObj* pSql) { void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
return; return;
...@@ -829,35 +847,30 @@ TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes) { ...@@ -829,35 +847,30 @@ TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes) {
return f; return f;
} }
SFieldSupInfo* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField) { SInternalField* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField) {
assert(pFieldInfo != NULL); assert(pFieldInfo != NULL);
taosArrayPush(pFieldInfo->pFields, pField);
pFieldInfo->numOfOutput++; pFieldInfo->numOfOutput++;
struct SFieldSupInfo info = { struct SInternalField info = {
.pSqlExpr = NULL, .pSqlExpr = NULL,
.pArithExprInfo = NULL, .pArithExprInfo = NULL,
.visible = true, .visible = true,
}; };
return taosArrayPush(pFieldInfo->pSupportInfo, &info);
}
SFieldSupInfo* tscFieldInfoGetSupp(SFieldInfo* pFieldInfo, int32_t index) { info.field = *pField;
return TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, index); return taosArrayPush(pFieldInfo->internalField, &info);
} }
SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* field) { SInternalField* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* field) {
taosArrayInsert(pFieldInfo->pFields, index, field);
pFieldInfo->numOfOutput++; pFieldInfo->numOfOutput++;
struct SInternalField info = {
struct SFieldSupInfo info = {
.pSqlExpr = NULL, .pSqlExpr = NULL,
.pArithExprInfo = NULL, .pArithExprInfo = NULL,
.visible = true, .visible = true,
}; };
return taosArrayInsert(pFieldInfo->pSupportInfo, index, &info); info.field = *field;
return taosArrayInsert(pFieldInfo->internalField, index, &info);
} }
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) { void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) {
...@@ -891,29 +904,18 @@ void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) { ...@@ -891,29 +904,18 @@ void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) {
} }
} }
void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src) { SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t index) {
dst->numOfOutput = src->numOfOutput; assert(index < pFieldInfo->numOfOutput);
return TARRAY_GET_ELEM(pFieldInfo->internalField, index);
if (dst->pFields == NULL) {
dst->pFields = taosArrayClone(src->pFields);
} else {
taosArrayCopy(dst->pFields, src->pFields);
}
if (dst->pSupportInfo == NULL) {
dst->pSupportInfo = taosArrayClone(src->pSupportInfo);
} else {
taosArrayCopy(dst->pSupportInfo, src->pSupportInfo);
}
} }
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index) { TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index) {
assert(index < pFieldInfo->numOfOutput); assert(index < pFieldInfo->numOfOutput);
return TARRAY_GET_ELEM(pFieldInfo->pFields, index); return &((SInternalField*)TARRAY_GET_ELEM(pFieldInfo->internalField, index))->field;
} }
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) { int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) {
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, index); SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, index);
assert(pInfo != NULL && pInfo->pSqlExpr != NULL); assert(pInfo != NULL && pInfo->pSqlExpr != NULL);
return pInfo->pSqlExpr->offset; return pInfo->pSqlExpr->offset;
...@@ -960,10 +962,8 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { ...@@ -960,10 +962,8 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
return; return;
} }
taosArrayDestroy(pFieldInfo->pFields);
for(int32_t i = 0; i < pFieldInfo->numOfOutput; ++i) { for(int32_t i = 0; i < pFieldInfo->numOfOutput; ++i) {
SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, i); SInternalField* pInfo = taosArrayGet(pFieldInfo->internalField, i);
if (pInfo->pArithExprInfo != NULL) { if (pInfo->pArithExprInfo != NULL) {
tExprTreeDestroy(&pInfo->pArithExprInfo->pExpr, NULL); tExprTreeDestroy(&pInfo->pArithExprInfo->pExpr, NULL);
...@@ -971,7 +971,9 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { ...@@ -971,7 +971,9 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
} }
} }
taosArrayDestroy(pFieldInfo->pSupportInfo); taosArrayDestroy(pFieldInfo->internalField);
taosTFree(pFieldInfo->final);
memset(pFieldInfo, 0, sizeof(SFieldInfo)); memset(pFieldInfo, 0, sizeof(SFieldInfo));
} }
...@@ -1615,11 +1617,8 @@ STableMetaInfo* tscGetTableMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, i ...@@ -1615,11 +1617,8 @@ STableMetaInfo* tscGetTableMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, i
} }
void tscInitQueryInfo(SQueryInfo* pQueryInfo) { void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
assert(pQueryInfo->fieldsInfo.pFields == NULL); assert(pQueryInfo->fieldsInfo.internalField == NULL);
pQueryInfo->fieldsInfo.pFields = taosArrayInit(4, sizeof(TAOS_FIELD)); pQueryInfo->fieldsInfo.internalField = taosArrayInit(4, sizeof(SInternalField));
assert(pQueryInfo->fieldsInfo.pSupportInfo == NULL);
pQueryInfo->fieldsInfo.pSupportInfo = taosArrayInit(4, sizeof(SFieldSupInfo));
assert(pQueryInfo->exprList == NULL); assert(pQueryInfo->exprList == NULL);
pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES);
...@@ -1682,8 +1681,14 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) { ...@@ -1682,8 +1681,14 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) {
void tscFreeVgroupTableInfo(SArray* pVgroupTables) { void tscFreeVgroupTableInfo(SArray* pVgroupTables) {
if (pVgroupTables != NULL) { if (pVgroupTables != NULL) {
for (size_t i = 0; i < taosArrayGetSize(pVgroupTables); i++) { size_t num = taosArrayGetSize(pVgroupTables);
for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i); SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
taosTFree(pInfo->vgInfo.epAddr[j].fqdn);
}
taosArrayDestroy(pInfo->itemList); taosArrayDestroy(pInfo->itemList);
} }
taosArrayDestroy(pVgroupTables); taosArrayDestroy(pVgroupTables);
...@@ -1695,6 +1700,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool rem ...@@ -1695,6 +1700,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool rem
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
tscClearTableMetaInfo(pTableMetaInfo, removeFromCache); tscClearTableMetaInfo(pTableMetaInfo, removeFromCache);
free(pTableMetaInfo); free(pTableMetaInfo);
...@@ -1727,13 +1733,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST ...@@ -1727,13 +1733,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
pTableMetaInfo->pTableMeta = pTableMeta; pTableMetaInfo->pTableMeta = pTableMeta;
if (vgroupList != NULL) { if (vgroupList != NULL) {
size_t size = sizeof(SVgroupsInfo) + sizeof(SCMVgroupInfo) * vgroupList->numOfVgroups; pTableMetaInfo->vgroupList = tscVgroupInfoClone(vgroupList);
pTableMetaInfo->vgroupList = malloc(size);
if (pTableMetaInfo->vgroupList == NULL) {
return NULL;
}
memcpy(pTableMetaInfo->vgroupList, vgroupList, size);
} }
pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES); pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES);
...@@ -1762,8 +1762,7 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) ...@@ -1762,8 +1762,7 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache)
taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache); taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
} }
taosTFree(pTableMetaInfo->vgroupList); pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscColumnListDestroy(pTableMetaInfo->tagColList); tscColumnListDestroy(pTableMetaInfo->tagColList);
pTableMetaInfo->tagColList = NULL; pTableMetaInfo->tagColList = NULL;
} }
...@@ -1831,54 +1830,23 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm ...@@ -1831,54 +1830,23 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
return pNew; return pNew;
} }
// current sql function is not direct output result, so create a dummy output field
static void doSetNewFieldInfo(SQueryInfo* pNewQueryInfo, SSqlExpr* pExpr) {
TAOS_FIELD f = {.type = (uint8_t)pExpr->resType, .bytes = pExpr->resBytes};
tstrncpy(f.name, pExpr->aliasName, sizeof(f.name));
SFieldSupInfo* pInfo1 = tscFieldInfoAppend(&pNewQueryInfo->fieldsInfo, &f);
pInfo1->pSqlExpr = pExpr;
pInfo1->visible = false;
}
static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pQueryInfo, SQueryInfo* pNewQueryInfo, int64_t uid) { static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pQueryInfo, SQueryInfo* pNewQueryInfo, int64_t uid) {
int32_t numOfOutput = (int32_t)tscSqlExprNumOfExprs(pNewQueryInfo); int32_t numOfOutput = (int32_t)tscSqlExprNumOfExprs(pNewQueryInfo);
if (numOfOutput == 0) { if (numOfOutput == 0) {
return; return;
} }
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); // set the field info in pNewQueryInfo object according to sqlExpr information
SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo; size_t numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo);
// set the field info in pNewQueryInfo object
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr* pExpr = tscSqlExprGet(pNewQueryInfo, i);
if (pExpr->uid == uid) { TAOS_FIELD f = tscCreateField((int8_t) pExpr->resType, pExpr->aliasName, pExpr->resBytes);
if (i < pFieldInfo->numOfOutput) { SInternalField* pInfo1 = tscFieldInfoAppend(&pNewQueryInfo->fieldsInfo, &f);
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, i); pInfo1->pSqlExpr = pExpr;
if (pInfo->pSqlExpr != NULL) {
TAOS_FIELD* p = tscFieldInfoGetField(pFieldInfo, i);
assert(strcmp(p->name, pExpr->aliasName) == 0);
SFieldSupInfo* pInfo1 = tscFieldInfoAppend(&pNewQueryInfo->fieldsInfo, p);
*pInfo1 = *pInfo;
} else {
assert(pInfo->pArithExprInfo != NULL);
doSetNewFieldInfo(pNewQueryInfo, pExpr);
}
} else { // it is a arithmetic column, does not have actual field for sqlExpr, so build it
doSetNewFieldInfo(pNewQueryInfo, pExpr);
}
}
} }
// make sure the the sqlExpr for each fields is correct // update the pSqlExpr pointer in SInternalField according the field name
numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo);
// update the pSqlExpr pointer in SFieldSupInfo according the field name
// make sure the pSqlExpr point to the correct SqlExpr in pNewQueryInfo, not SqlExpr in pQueryInfo // make sure the pSqlExpr point to the correct SqlExpr in pNewQueryInfo, not SqlExpr in pQueryInfo
for (int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) { for (int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) {
TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f); TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f);
...@@ -1888,7 +1856,7 @@ static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pQueryInfo, SQueryInfo* p ...@@ -1888,7 +1856,7 @@ static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pQueryInfo, SQueryInfo* p
SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1); SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1);
if (strcmp(field->name, pExpr1->aliasName) == 0) { // establish link according to the result field name if (strcmp(field->name, pExpr1->aliasName) == 0) { // establish link according to the result field name
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pNewQueryInfo->fieldsInfo, f); SInternalField* pInfo = tscFieldInfoGetInternalField(&pNewQueryInfo->fieldsInfo, f);
pInfo->pSqlExpr = pExpr1; pInfo->pSqlExpr = pExpr1;
matched = true; matched = true;
...@@ -2403,3 +2371,58 @@ void tscClearSqlOwner(SSqlObj* pSql) { ...@@ -2403,3 +2371,58 @@ void tscClearSqlOwner(SSqlObj* pSql) {
assert(taosCheckPthreadValid(pSql->owner)); assert(taosCheckPthreadValid(pSql->owner));
atomic_store_64(&pSql->owner, 0); atomic_store_64(&pSql->owner, 0);
} }
SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) {
if (vgroupList == NULL) {
return NULL;
}
size_t size = sizeof(SVgroupsInfo) + sizeof(SCMVgroupInfo) * vgroupList->numOfVgroups;
SVgroupsInfo* pNew = calloc(1, size);
if (pNew == NULL) {
return NULL;
}
pNew->numOfVgroups = vgroupList->numOfVgroups;
for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) {
SCMVgroupInfo* pNewVInfo = &pNew->vgroups[i];
SCMVgroupInfo* pvInfo = &vgroupList->vgroups[i];
pNewVInfo->vgId = pvInfo->vgId;
pNewVInfo->numOfEps = pvInfo->numOfEps;
for(int32_t j = 0; j < pvInfo->numOfEps; ++j) {
pNewVInfo->epAddr[j].fqdn = strdup(pvInfo->epAddr[j].fqdn);
pNewVInfo->epAddr[j].port = pvInfo->epAddr[j].port;
}
}
return pNew;
}
void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) {
if (vgroupList == NULL) {
return NULL;
}
for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) {
SCMVgroupInfo* pVgroupInfo = &vgroupList->vgroups[i];
for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) {
taosTFree(pVgroupInfo->epAddr[j].fqdn);
}
}
taosTFree(vgroupList);
return NULL;
}
void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src) {
dst->vgId = src->vgId;
dst->numOfEps = src->numOfEps;
for(int32_t i = 0; i < dst->numOfEps; ++i) {
dst->epAddr[i].port = src->epAddr[i].port;
dst->epAddr[i].fqdn = strdup(src->epAddr[i].fqdn);
}
}
...@@ -173,15 +173,15 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char ...@@ -173,15 +173,15 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
return rpcRsp.code; return rpcRsp.code;
} }
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) { void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) {
dDebug("vgId:%d, sid:%d send config table msg to mnode", vgId, sid); dDebug("vgId:%d, tid:%d send config table msg to mnode", vgId, tid);
int32_t contLen = sizeof(SDMConfigTableMsg); int32_t contLen = sizeof(SDMConfigTableMsg);
SDMConfigTableMsg *pMsg = rpcMallocCont(contLen); SDMConfigTableMsg *pMsg = rpcMallocCont(contLen);
pMsg->dnodeId = htonl(dnodeGetDnodeId()); pMsg->dnodeId = htonl(dnodeGetDnodeId());
pMsg->vgId = htonl(vgId); pMsg->vgId = htonl(vgId);
pMsg->sid = htonl(sid); pMsg->tid = htonl(tid);
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pMsg; rpcMsg.pCont = pMsg;
...@@ -194,18 +194,18 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) { ...@@ -194,18 +194,18 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) {
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
rpcFreeCont(rpcRsp.pCont); rpcFreeCont(rpcRsp.pCont);
dError("vgId:%d, sid:%d failed to config table from mnode", vgId, sid); dError("vgId:%d, tid:%d failed to config table from mnode", vgId, tid);
return NULL; return NULL;
} else { } else {
dInfo("vgId:%d, sid:%d config table msg is received", vgId, sid); dInfo("vgId:%d, tid:%d config table msg is received", vgId, tid);
// delete this after debug finished // delete this after debug finished
SMDCreateTableMsg *pTable = rpcRsp.pCont; SMDCreateTableMsg *pTable = rpcRsp.pCont;
int16_t numOfColumns = htons(pTable->numOfColumns); int16_t numOfColumns = htons(pTable->numOfColumns);
int16_t numOfTags = htons(pTable->numOfTags); int16_t numOfTags = htons(pTable->numOfTags);
int32_t sid = htonl(pTable->sid); int32_t tableId = htonl(pTable->tid);
uint64_t uid = htobe64(pTable->uid); uint64_t uid = htobe64(pTable->uid);
dInfo("table:%s, numOfColumns:%d numOfTags:%d sid:%d uid:%" PRIu64, pTable->tableId, numOfColumns, numOfTags, sid, uid); dInfo("table:%s, numOfColumns:%d numOfTags:%d tid:%d uid:%" PRIu64, pTable->tableId, numOfColumns, numOfTags, tableId, uid);
return rpcRsp.pCont; return rpcRsp.pCont;
} }
......
...@@ -212,7 +212,8 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -212,7 +212,8 @@ static void *dnodeProcessReadQueue(void *param) {
} else { } else {
if (code == TSDB_CODE_QRY_HAS_RSP) { if (code == TSDB_CODE_QRY_HAS_RSP) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code); dnodeSendRpcReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code);
} else { // code == TSDB_CODE_NOT_READY, do not return msg to client } else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client
assert(pReadMsg->rpcMsg.handle == NULL || (pReadMsg->rpcMsg.handle != NULL && pReadMsg->rpcMsg.msgType == 5));
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code); dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
} }
} }
......
...@@ -49,7 +49,7 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); ...@@ -49,7 +49,7 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocateVnodeWqueue(void *pVnode); void *dnodeAllocateVnodeWqueue(void *pVnode);
void dnodeFreeVnodeWqueue(void *queue); void dnodeFreeVnodeWqueue(void *queue);
......
...@@ -182,10 +182,16 @@ extern char *taosMsg[]; ...@@ -182,10 +182,16 @@ extern char *taosMsg[];
#pragma pack(push, 1) #pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
typedef struct { typedef struct {
char fqdn[TSDB_FQDN_LEN]; char fqdn[TSDB_FQDN_LEN];
uint16_t port; uint16_t port;
} SEpAddr; } SEpAddrMsg;
typedef struct {
char* fqdn;
uint16_t port;
} SEpAddr1;
typedef struct { typedef struct {
int32_t numOfVnodes; int32_t numOfVnodes;
...@@ -245,7 +251,7 @@ typedef struct { ...@@ -245,7 +251,7 @@ typedef struct {
int8_t tableType; int8_t tableType;
int16_t numOfColumns; int16_t numOfColumns;
int16_t numOfTags; int16_t numOfTags;
int32_t sid; int32_t tid;
int32_t sversion; int32_t sversion;
int32_t tversion; int32_t tversion;
int32_t tagDataLen; int32_t tagDataLen;
...@@ -354,7 +360,7 @@ typedef struct { ...@@ -354,7 +360,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t contLen; int32_t contLen;
int32_t vgId; int32_t vgId;
int32_t sid; int32_t tid;
uint64_t uid; uint64_t uid;
char tableId[TSDB_TABLE_FNAME_LEN]; char tableId[TSDB_TABLE_FNAME_LEN];
} SMDDropTableMsg; } SMDDropTableMsg;
...@@ -401,6 +407,7 @@ typedef struct SExprInfo { ...@@ -401,6 +407,7 @@ typedef struct SExprInfo {
int16_t bytes; int16_t bytes;
int16_t type; int16_t type;
int32_t interBytes; int32_t interBytes;
int64_t uid;
} SExprInfo; } SExprInfo;
typedef struct SColumnFilterInfo { typedef struct SColumnFilterInfo {
...@@ -662,16 +669,27 @@ typedef struct SCMSTableVgroupMsg { ...@@ -662,16 +669,27 @@ typedef struct SCMSTableVgroupMsg {
} SCMSTableVgroupMsg, SCMSTableVgroupRspMsg; } SCMSTableVgroupMsg, SCMSTableVgroupRspMsg;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int8_t numOfEps; int8_t numOfEps;
SEpAddr epAddr[TSDB_MAX_REPLICA]; SEpAddr1 epAddr[TSDB_MAX_REPLICA];
} SCMVgroupInfo; } SCMVgroupInfo;
typedef struct {
int32_t vgId;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SCMVgroupMsg;
typedef struct { typedef struct {
int32_t numOfVgroups; int32_t numOfVgroups;
SCMVgroupInfo vgroups[]; SCMVgroupInfo vgroups[];
} SVgroupsInfo; } SVgroupsInfo;
typedef struct {
int32_t numOfVgroups;
SCMVgroupMsg vgroups[];
} SVgroupsMsg;
typedef struct STableMetaMsg { typedef struct STableMetaMsg {
int32_t contLen; int32_t contLen;
char tableId[TSDB_TABLE_FNAME_LEN]; // table id char tableId[TSDB_TABLE_FNAME_LEN]; // table id
...@@ -682,9 +700,9 @@ typedef struct STableMetaMsg { ...@@ -682,9 +700,9 @@ typedef struct STableMetaMsg {
int16_t numOfColumns; int16_t numOfColumns;
int16_t sversion; int16_t sversion;
int16_t tversion; int16_t tversion;
int32_t sid; int32_t tid;
uint64_t uid; uint64_t uid;
SCMVgroupInfo vgroup; SCMVgroupMsg vgroup;
SSchema schema[]; SSchema schema[];
} STableMetaMsg; } STableMetaMsg;
...@@ -730,7 +748,7 @@ typedef struct { ...@@ -730,7 +748,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t vgId; int32_t vgId;
int32_t sid; int32_t tid;
} SDMConfigTableMsg; } SDMConfigTableMsg;
typedef struct { typedef struct {
......
...@@ -115,7 +115,7 @@ typedef struct { ...@@ -115,7 +115,7 @@ typedef struct {
uint64_t suid; uint64_t suid;
int64_t createdTime; int64_t createdTime;
int32_t numOfColumns; //used by normal table int32_t numOfColumns; //used by normal table
int32_t sid; int32_t tid;
int32_t vgId; int32_t vgId;
int32_t sqlLen; int32_t sqlLen;
int8_t updateEnd[4]; int8_t updateEnd[4];
......
...@@ -303,7 +303,7 @@ static int32_t mnodeChildTableActionRestored() { ...@@ -303,7 +303,7 @@ static int32_t mnodeChildTableActionRestored() {
SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId); SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
mError("ctable:%s, failed to get vgId:%d sid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->sid); mError("ctable:%s, failed to get vgId:%d tid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->tid);
pTable->vgId = 0; pTable->vgId = 0;
SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb}; SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc); sdbDeleteRow(&desc);
...@@ -314,7 +314,7 @@ static int32_t mnodeChildTableActionRestored() { ...@@ -314,7 +314,7 @@ static int32_t mnodeChildTableActionRestored() {
if (strcmp(pVgroup->dbName, pDb->name) != 0) { if (strcmp(pVgroup->dbName, pDb->name) != 0) {
mError("ctable:%s, db:%s not match with vgId:%d db:%s sid:%d, discard it", mError("ctable:%s, db:%s not match with vgId:%d db:%s sid:%d, discard it",
pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid); pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->tid);
pTable->vgId = 0; pTable->vgId = 0;
SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb}; SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc); sdbDeleteRow(&desc);
...@@ -771,8 +771,8 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) { ...@@ -771,8 +771,8 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
return mnodeProcessDropSuperTableMsg(pMsg); return mnodeProcessDropSuperTableMsg(pMsg);
} else { } else {
SChildTableObj *pCTable = (SChildTableObj *)pMsg->pTable; SChildTableObj *pCTable = (SChildTableObj *)pMsg->pTable;
mInfo("app:%p:%p, table:%s, start to drop ctable, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, mInfo("app:%p:%p, table:%s, start to drop ctable, vgId:%d tid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
pDrop->tableId, pCTable->vgId, pCTable->sid, pCTable->uid); pDrop->tableId, pCTable->vgId, pCTable->tid, pCTable->uid);
return mnodeProcessDropChildTableMsg(pMsg); return mnodeProcessDropChildTableMsg(pMsg);
} }
} }
...@@ -1476,13 +1476,14 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { ...@@ -1476,13 +1476,14 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
int32_t numOfTable = htonl(pInfo->numOfTables); int32_t numOfTable = htonl(pInfo->numOfTables);
// reserve space // reserve space
int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + 32 * sizeof(SCMVgroupInfo) + sizeof(SVgroupsInfo); int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + 32 * sizeof(SCMVgroupMsg) + sizeof(SVgroupsMsg);
for (int32_t i = 0; i < numOfTable; ++i) { for (int32_t i = 0; i < numOfTable; ++i) {
char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN) * i; char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN) * i;
SSuperTableObj *pTable = mnodeGetSuperTable(stableName); SSuperTableObj *pTable = mnodeGetSuperTable(stableName);
if (pTable != NULL && pTable->vgHash != NULL) { if (pTable != NULL && pTable->vgHash != NULL) {
contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SCMVgroupInfo) + sizeof(SVgroupsInfo)); contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SCMVgroupMsg) + sizeof(SVgroupsMsg));
} }
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
} }
...@@ -1510,12 +1511,12 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { ...@@ -1510,12 +1511,12 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
// even this super table has no corresponding table, still return // even this super table has no corresponding table, still return
pRsp->numOfTables++; pRsp->numOfTables++;
SVgroupsInfo *pVgroupInfo = (SVgroupsInfo *)msg; SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
pVgroupInfo->numOfVgroups = 0; pVgroupMsg->numOfVgroups = 0;
msg += sizeof(SVgroupsInfo); msg += sizeof(SVgroupsMsg);
} else { } else {
SVgroupsInfo *pVgroupInfo = (SVgroupsInfo *)msg; SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
SHashMutableIterator *pIter = taosHashCreateIter(pTable->vgHash); SHashMutableIterator *pIter = taosHashCreateIter(pTable->vgHash);
int32_t vgSize = 0; int32_t vgSize = 0;
...@@ -1524,15 +1525,17 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { ...@@ -1524,15 +1525,17 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
SVgObj * pVgroup = mnodeGetVgroup(*pVgId); SVgObj * pVgroup = mnodeGetVgroup(*pVgId);
if (pVgroup == NULL) continue; if (pVgroup == NULL) continue;
pVgroupInfo->vgroups[vgSize].vgId = htonl(pVgroup->vgId); pVgroupMsg->vgroups[vgSize].vgId = htonl(pVgroup->vgId);
pVgroupMsg->vgroups[vgSize].numOfEps = 0;
for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) { for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) {
SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode; SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode;
if (pDnode == NULL) break; if (pDnode == NULL) break;
tstrncpy(pVgroupInfo->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN); tstrncpy(pVgroupMsg->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN);
pVgroupInfo->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort); pVgroupMsg->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort);
pVgroupInfo->vgroups[vgSize].numOfEps++; pVgroupMsg->vgroups[vgSize].numOfEps++;
} }
vgSize++; vgSize++;
...@@ -1542,10 +1545,10 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { ...@@ -1542,10 +1545,10 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
taosHashDestroyIter(pIter); taosHashDestroyIter(pIter);
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
pVgroupInfo->numOfVgroups = htonl(vgSize); pVgroupMsg->numOfVgroups = htonl(vgSize);
// one table is done, try the next table // one table is done, try the next table
msg += sizeof(SVgroupsInfo) + vgSize * sizeof(SCMVgroupInfo); msg += sizeof(SVgroupsMsg) + vgSize * sizeof(SCMVgroupMsg);
pRsp->numOfTables++; pRsp->numOfTables++;
} }
} }
...@@ -1595,7 +1598,7 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO ...@@ -1595,7 +1598,7 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO
pCreate->vgId = htonl(pTable->vgId); pCreate->vgId = htonl(pTable->vgId);
pCreate->tableType = pTable->info.type; pCreate->tableType = pTable->info.type;
pCreate->createdTime = htobe64(pTable->createdTime); pCreate->createdTime = htobe64(pTable->createdTime);
pCreate->sid = htonl(pTable->sid); pCreate->tid = htonl(pTable->tid);
pCreate->sqlDataLen = htonl(pTable->sqlLen); pCreate->sqlDataLen = htonl(pTable->sqlLen);
pCreate->uid = htobe64(pTable->uid); pCreate->uid = htobe64(pTable->uid);
...@@ -1644,7 +1647,7 @@ static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) { ...@@ -1644,7 +1647,7 @@ static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) {
assert(pTable); assert(pTable);
mDebug("app:%p:%p, table:%s, created in mnode, vgId:%d sid:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, mDebug("app:%p:%p, table:%s, created in mnode, vgId:%d sid:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid); pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid);
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable); SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable);
...@@ -1686,7 +1689,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1686,7 +1689,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} else { } else {
mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg, mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code)); pTable->info.tableId, pTable->tid, pTable->uid, tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb}; SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc); sdbDeleteRow(&desc);
return code; return code;
...@@ -1710,7 +1713,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { ...@@ -1710,7 +1713,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
pTable->info.tableId = strdup(pCreate->tableId); pTable->info.tableId = strdup(pCreate->tableId);
pTable->createdTime = taosGetTimestampMs(); pTable->createdTime = taosGetTimestampMs();
pTable->sid = tid; pTable->tid = tid;
pTable->vgId = pVgroup->vgId; pTable->vgId = pVgroup->vgId;
if (pTable->info.type == TSDB_CHILD_TABLE) { if (pTable->info.type == TSDB_CHILD_TABLE) {
...@@ -1724,7 +1727,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { ...@@ -1724,7 +1727,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
} }
pTable->suid = pMsg->pSTable->uid; pTable->suid = pMsg->pSTable->uid;
pTable->uid = (((uint64_t)pTable->vgId) << 48) + ((((uint64_t)pTable->sid) & ((1ul << 24) - 1ul)) << 24) + pTable->uid = (((uint64_t)pTable->vgId) << 48) + ((((uint64_t)pTable->tid) & ((1ul << 24) - 1ul)) << 24) +
((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
pTable->superTable = pMsg->pSTable; pTable->superTable = pMsg->pSTable;
} else { } else {
...@@ -1732,7 +1735,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { ...@@ -1732,7 +1735,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
int64_t us = taosGetTimestampUs(); int64_t us = taosGetTimestampUs();
pTable->uid = (us << 24) + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); pTable->uid = (us << 24) + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
} else { } else {
pTable->uid = (((uint64_t)pTable->vgId) << 48) + ((((uint64_t)pTable->sid) & ((1ul << 24) - 1ul)) << 24) + pTable->uid = (((uint64_t)pTable->vgId) << 48) + ((((uint64_t)pTable->tid) & ((1ul << 24) - 1ul)) << 24) +
((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
} }
...@@ -1789,7 +1792,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { ...@@ -1789,7 +1792,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
tstrerror(code)); tstrerror(code));
} else { } else {
mDebug("app:%p:%p, table:%s, allocated in vgroup, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, mDebug("app:%p:%p, table:%s, allocated in vgroup, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pVgroup->vgId, pTable->sid, pTable->uid); pTable->info.tableId, pVgroup->vgId, pTable->tid, pTable->uid);
} }
return code; return code;
...@@ -1807,8 +1810,8 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { ...@@ -1807,8 +1810,8 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
if (pMsg->retry == 0) { if (pMsg->retry == 0) {
if (pMsg->pTable == NULL) { if (pMsg->pTable == NULL) {
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
int32_t sid = 0; int32_t tid = 0;
code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &sid); code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &tid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mDebug("app:%p:%p, table:%s, failed to get available vgroup, reason:%s", pMsg->rpcMsg.ahandle, pMsg, mDebug("app:%p:%p, table:%s, failed to get available vgroup, reason:%s", pMsg->rpcMsg.ahandle, pMsg,
pCreate->tableId, tstrerror(code)); pCreate->tableId, tstrerror(code));
...@@ -1822,7 +1825,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { ...@@ -1822,7 +1825,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
pMsg->pVgroup = pVgroup; pMsg->pVgroup = pVgroup;
mnodeIncVgroupRef(pVgroup); mnodeIncVgroupRef(pVgroup);
return mnodeDoCreateChildTable(pMsg, sid); return mnodeDoCreateChildTable(pMsg, tid);
} }
} else { } else {
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pCreate->tableId); if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pCreate->tableId);
...@@ -1852,13 +1855,13 @@ static int32_t mnodeSendDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) { ...@@ -1852,13 +1855,13 @@ static int32_t mnodeSendDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
tstrncpy(pDrop->tableId, pTable->info.tableId, TSDB_TABLE_FNAME_LEN); tstrncpy(pDrop->tableId, pTable->info.tableId, TSDB_TABLE_FNAME_LEN);
pDrop->vgId = htonl(pTable->vgId); pDrop->vgId = htonl(pTable->vgId);
pDrop->contLen = htonl(sizeof(SMDDropTableMsg)); pDrop->contLen = htonl(sizeof(SMDDropTableMsg));
pDrop->sid = htonl(pTable->sid); pDrop->tid = htonl(pTable->tid);
pDrop->uid = htobe64(pTable->uid); pDrop->uid = htobe64(pTable->uid);
SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup); SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup);
mInfo("app:%p:%p, ctable:%s, send drop ctable msg, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, mInfo("app:%p:%p, ctable:%s, send drop ctable msg, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
pDrop->tableId, pTable->vgId, pTable->sid, pTable->uid); pDrop->tableId, pTable->vgId, pTable->tid, pTable->uid);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.ahandle = pMsg, .ahandle = pMsg,
...@@ -2097,7 +2100,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { ...@@ -2097,7 +2100,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
pMeta->uid = htobe64(pTable->uid); pMeta->uid = htobe64(pTable->uid);
pMeta->sid = htonl(pTable->sid); pMeta->tid = htonl(pTable->tid);
pMeta->precision = pDb->cfg.precision; pMeta->precision = pDb->cfg.precision;
pMeta->tableType = pTable->info.type; pMeta->tableType = pTable->info.type;
tstrncpy(pMeta->tableId, pTable->info.tableId, TSDB_TABLE_FNAME_LEN); tstrncpy(pMeta->tableId, pTable->info.tableId, TSDB_TABLE_FNAME_LEN);
...@@ -2137,7 +2140,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { ...@@ -2137,7 +2140,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId); pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId);
mDebug("app:%p:%p, table:%s, uid:%" PRIu64 " table meta is retrieved, vgId:%d sid:%d", pMsg->rpcMsg.ahandle, pMsg, mDebug("app:%p:%p, table:%s, uid:%" PRIu64 " table meta is retrieved, vgId:%d sid:%d", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->uid, pTable->vgId, pTable->sid); pTable->info.tableId, pTable->uid, pTable->vgId, pTable->tid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2289,11 +2292,11 @@ static void mnodeDropAllChildTablesInStable(SSuperTableObj *pStable) { ...@@ -2289,11 +2292,11 @@ static void mnodeDropAllChildTablesInStable(SSuperTableObj *pStable) {
} }
#if 0 #if 0
static SChildTableObj* mnodeGetTableByPos(int32_t vnode, int32_t sid) { static SChildTableObj* mnodeGetTableByPos(int32_t vnode, int32_t tid) {
SVgObj *pVgroup = mnodeGetVgroup(vnode); SVgObj *pVgroup = mnodeGetVgroup(vnode);
if (pVgroup == NULL) return NULL; if (pVgroup == NULL) return NULL;
SChildTableObj *pTable = pVgroup->tableList[sid - 1]; SChildTableObj *pTable = pVgroup->tableList[tid - 1];
mnodeIncTableRef((STableObj *)pTable); mnodeIncTableRef((STableObj *)pTable);
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
...@@ -2341,12 +2344,12 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -2341,12 +2344,12 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
assert(pTable); assert(pTable);
mInfo("app:%p:%p, table:%s, drop table rsp received, vgId:%d sid:%d uid:%" PRIu64 ", thandle:%p result:%s", mInfo("app:%p:%p, table:%s, drop table rsp received, vgId:%d sid:%d uid:%" PRIu64 ", thandle:%p result:%s",
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid, mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid,
mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
if (rpcMsg->code != TSDB_CODE_SUCCESS) { if (rpcMsg->code != TSDB_CODE_SUCCESS) {
mError("app:%p:%p, table:%s, failed to drop in dnode, vgId:%d sid:%d uid:%" PRIu64 ", reason:%s", mError("app:%p:%p, table:%s, failed to drop in dnode, vgId:%d sid:%d uid:%" PRIu64 ", reason:%s",
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid, mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid,
tstrerror(rpcMsg->code)); tstrerror(rpcMsg->code));
dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code); dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code);
return; return;
...@@ -2384,7 +2387,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -2384,7 +2387,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
// If the table is deleted by another thread during creation, stop creating and send drop msg to vnode // If the table is deleted by another thread during creation, stop creating and send drop msg to vnode
if (sdbCheckRowDeleted(tsChildTableSdb, pTable)) { if (sdbCheckRowDeleted(tsChildTableSdb, pTable)) {
mDebug("app:%p:%p, table:%s, create table rsp received, but a deleting opertion incoming, vgId:%d sid:%d uid:%" PRIu64, mDebug("app:%p:%p, table:%s, create table rsp received, but a deleting opertion incoming, vgId:%d sid:%d uid:%" PRIu64,
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid); mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid);
// if the vgroup is already dropped from hash, it can't be accquired by pTable->vgId // if the vgroup is already dropped from hash, it can't be accquired by pTable->vgId
// so the refCount of vgroup can not be decreased // so the refCount of vgroup can not be decreased
...@@ -2419,13 +2422,13 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -2419,13 +2422,13 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
if (mnodeMsg->retry++ < 10) { if (mnodeMsg->retry++ < 10) {
mDebug("app:%p:%p, table:%s, create table rsp received, need retry, times:%d vgId:%d sid:%d uid:%" PRIu64 mDebug("app:%p:%p, table:%s, create table rsp received, need retry, times:%d vgId:%d sid:%d uid:%" PRIu64
" result:%s thandle:%p", " result:%s thandle:%p",
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->retry, pTable->vgId, pTable->sid, mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->retry, pTable->vgId, pTable->tid,
pTable->uid, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle); pTable->uid, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle);
dnodeDelayReprocessMnodeWriteMsg(mnodeMsg); dnodeDelayReprocessMnodeWriteMsg(mnodeMsg);
} else { } else {
mError("app:%p:%p, table:%s, failed to create in dnode, vgId:%d sid:%d uid:%" PRIu64 ", result:%s thandle:%p", mError("app:%p:%p, table:%s, failed to create in dnode, vgId:%d sid:%d uid:%" PRIu64 ", result:%s thandle:%p",
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid, mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid,
tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle); tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle);
SSdbOper oper = {.type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable}; SSdbOper oper = {.type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable};
...@@ -2680,7 +2683,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows ...@@ -2680,7 +2683,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
// tid // tid
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t*) pWrite = pTable->sid; *(int32_t*) pWrite = pTable->tid;
cols++; cols++;
//vgid //vgid
......
...@@ -793,12 +793,12 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v ...@@ -793,12 +793,12 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) {
int32_t idPoolSize = taosIdPoolMaxSize(pVgroup->idPool); int32_t idPoolSize = taosIdPoolMaxSize(pVgroup->idPool);
if (pTable->sid > idPoolSize) { if (pTable->tid > idPoolSize) {
mnodeAllocVgroupIdPool(pVgroup); mnodeAllocVgroupIdPool(pVgroup);
} }
if (pTable->sid >= 1) { if (pTable->tid >= 1) {
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid); taosIdPoolMarkStatus(pVgroup->idPool, pTable->tid);
pVgroup->numOfTables++; pVgroup->numOfTables++;
// The create vgroup message may be received later than the create table message // The create vgroup message may be received later than the create table message
// and the writing order in sdb is therefore uncertain // and the writing order in sdb is therefore uncertain
...@@ -808,8 +808,8 @@ void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { ...@@ -808,8 +808,8 @@ void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) {
} }
void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable) {
if (pTable->sid >= 1) { if (pTable->tid >= 1) {
taosFreeId(pVgroup->idPool, pTable->sid); taosFreeId(pVgroup->idPool, pTable->tid);
pVgroup->numOfTables--; pVgroup->numOfTables--;
// The create vgroup message may be received later than the create table message // The create vgroup message may be received later than the create table message
// and the writing order in sdb is therefore uncertain // and the writing order in sdb is therefore uncertain
......
...@@ -213,6 +213,7 @@ typedef struct SQInfo { ...@@ -213,6 +213,7 @@ typedef struct SQInfo {
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
pthread_mutex_t lock; // used to synchronize the rsp/query threads pthread_mutex_t lock; // used to synchronize the rsp/query threads
tsem_t ready;
int32_t dataReady; // denote if query result is ready or not int32_t dataReady; // denote if query result is ready or not
void* rspContext; // response context void* rspContext; // response context
} SQInfo; } SQInfo;
......
...@@ -276,8 +276,6 @@ tSQLExpr *tSQLExprNodeClone(tSQLExpr *pExpr); ...@@ -276,8 +276,6 @@ tSQLExpr *tSQLExprNodeClone(tSQLExpr *pExpr);
SAlterTableSQL *tAlterTableSQLElems(SStrToken *pMeterName, tFieldList *pCols, tVariantList *pVals, int32_t type); SAlterTableSQL *tAlterTableSQLElems(SStrToken *pMeterName, tFieldList *pCols, tVariantList *pVals, int32_t type);
tSQLExprListList *tSQLListListAppend(tSQLExprListList *pList, tSQLExprList *pExprList);
void destroyAllSelectClause(SSubclauseInfo *pSql); void destroyAllSelectClause(SSubclauseInfo *pSql);
void doDestroyQuerySql(SQuerySQL *pSql); void doDestroyQuerySql(SQuerySQL *pSql);
......
...@@ -646,9 +646,7 @@ static bool filterItem(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *p ...@@ -646,9 +646,7 @@ static bool filterItem(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *p
} }
// handle the leaf node // handle the leaf node
assert(pLeft->nodeType == TSQL_NODE_COL && pRight->nodeType == TSQL_NODE_VALUE);
param->setupInfoFn(pExpr, param->pExtInfo); param->setupInfoFn(pExpr, param->pExtInfo);
return param->nodeFilterFn(pItem, pExpr->_node.info); return param->nodeFilterFn(pItem, pExpr->_node.info);
} }
...@@ -769,6 +767,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S ...@@ -769,6 +767,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
assert(taosArrayGetSize(result) == 0); assert(taosArrayGetSize(result) == 0);
tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param); tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param);
} }
return; return;
} }
......
...@@ -6227,7 +6227,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -6227,7 +6227,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info // NOTE: pTableCheckInfo need to update the query time range and the lastKey info
pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo)); pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo));
pQInfo->dataReady = QUERY_RESULT_NOT_READY; pQInfo->dataReady = QUERY_RESULT_NOT_READY;
pQInfo->rspContext = NULL;
pthread_mutex_init(&pQInfo->lock, NULL); pthread_mutex_init(&pQInfo->lock, NULL);
tsem_init(&pQInfo->ready, 0, 0);
pQuery->pos = -1; pQuery->pos = -1;
pQuery->window = pQueryMsg->window; pQuery->window = pQueryMsg->window;
...@@ -6692,12 +6694,14 @@ static bool doBuildResCheck(SQInfo* pQInfo) { ...@@ -6692,12 +6694,14 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
pQInfo->dataReady = QUERY_RESULT_READY; pQInfo->dataReady = QUERY_RESULT_READY;
buildRes = (pQInfo->rspContext != NULL); buildRes = (pQInfo->rspContext != NULL);
pthread_mutex_unlock(&pQInfo->lock); // clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is
// put into task to be executed.
// clear qhandle owner
assert(pQInfo->owner == taosGetPthreadId()); assert(pQInfo->owner == taosGetPthreadId());
pQInfo->owner = 0; pQInfo->owner = 0;
pthread_mutex_unlock(&pQInfo->lock);
tsem_post(&pQInfo->ready);
return buildRes; return buildRes;
} }
...@@ -6761,18 +6765,24 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex ...@@ -6761,18 +6765,24 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) { if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
qError("QInfo:%p invalid qhandle", pQInfo);
return TSDB_CODE_QRY_INVALID_QHANDLE; return TSDB_CODE_QRY_INVALID_QHANDLE;
} }
*buildRes = false; *buildRes = false;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (IS_QUERY_KILLED(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); qDebug("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
return pQInfo->code; return pQInfo->code;
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
#if 0
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
pthread_mutex_lock(&pQInfo->lock); pthread_mutex_lock(&pQInfo->lock);
assert(pQInfo->rspContext == NULL);
if (pQInfo->dataReady == QUERY_RESULT_READY) { if (pQInfo->dataReady == QUERY_RESULT_READY) {
*buildRes = true; *buildRes = true;
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%"PRId64", code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows, qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%"PRId64", code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows,
...@@ -6781,10 +6791,17 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex ...@@ -6781,10 +6791,17 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
*buildRes = false; *buildRes = false;
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo); qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
pQInfo->rspContext = pRspContext; pQInfo->rspContext = pRspContext;
assert(pQInfo->rspContext != NULL);
} }
code = pQInfo->code; code = pQInfo->code;
pthread_mutex_unlock(&pQInfo->lock); pthread_mutex_unlock(&pQInfo->lock);
#else
tsem_wait(&pQInfo->ready);
*buildRes = true;
code = pQInfo->code;
#endif
return code; return code;
} }
......
...@@ -130,13 +130,15 @@ tSQLExpr *tSQLExprIdValueCreate(SStrToken *pToken, int32_t optrType) { ...@@ -130,13 +130,15 @@ tSQLExpr *tSQLExprIdValueCreate(SStrToken *pToken, int32_t optrType) {
tVariantCreate(&pSQLExpr->val, pToken); tVariantCreate(&pSQLExpr->val, pToken);
pSQLExpr->nSQLOptr = optrType; pSQLExpr->nSQLOptr = optrType;
} else if (optrType == TK_NOW) { } else if (optrType == TK_NOW) {
// default use microsecond // use microsecond by default
pSQLExpr->val.i64Key = taosGetTimestamp(TSDB_TIME_PRECISION_MICRO); pSQLExpr->val.i64Key = taosGetTimestamp(TSDB_TIME_PRECISION_MICRO);
pSQLExpr->val.nType = TSDB_DATA_TYPE_BIGINT; pSQLExpr->val.nType = TSDB_DATA_TYPE_BIGINT;
pSQLExpr->nSQLOptr = TK_TIMESTAMP; // TK_TIMESTAMP used to denote the time value is in microsecond pSQLExpr->nSQLOptr = TK_TIMESTAMP; // TK_TIMESTAMP used to denote the time value is in microsecond
} else if (optrType == TK_VARIABLE) { } else if (optrType == TK_VARIABLE) {
int32_t ret = parseAbsoluteDuration(pToken->z, pToken->n, &pSQLExpr->val.i64Key); int32_t ret = parseAbsoluteDuration(pToken->z, pToken->n, &pSQLExpr->val.i64Key);
UNUSED(ret); if (ret != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
pSQLExpr->val.nType = TSDB_DATA_TYPE_BIGINT; pSQLExpr->val.nType = TSDB_DATA_TYPE_BIGINT;
pSQLExpr->nSQLOptr = TK_TIMESTAMP; pSQLExpr->nSQLOptr = TK_TIMESTAMP;
...@@ -148,6 +150,7 @@ tSQLExpr *tSQLExprIdValueCreate(SStrToken *pToken, int32_t optrType) { ...@@ -148,6 +150,7 @@ tSQLExpr *tSQLExprIdValueCreate(SStrToken *pToken, int32_t optrType) {
pSQLExpr->nSQLOptr = optrType; pSQLExpr->nSQLOptr = optrType;
} }
return pSQLExpr; return pSQLExpr;
} }
...@@ -532,26 +535,6 @@ SQuerySQL *tSetQuerySQLElems(SStrToken *pSelectToken, tSQLExprList *pSelection, ...@@ -532,26 +535,6 @@ SQuerySQL *tSetQuerySQLElems(SStrToken *pSelectToken, tSQLExprList *pSelection,
return pQuery; return pQuery;
} }
tSQLExprListList *tSQLListListAppend(tSQLExprListList *pList, tSQLExprList *pExprList) {
if (pList == NULL) pList = calloc(1, sizeof(tSQLExprListList));
if (pList->nAlloc <= pList->nList) { //
pList->nAlloc = (pList->nAlloc << 1) + 4;
pList->a = realloc(pList->a, pList->nAlloc * sizeof(pList->a[0]));
if (pList->a == 0) {
pList->nList = pList->nAlloc = 0;
return pList;
}
}
assert(pList->a != 0);
if (pExprList) {
pList->a[pList->nList++] = pExprList;
}
return pList;
}
void doDestroyQuerySql(SQuerySQL *pQuerySql) { void doDestroyQuerySql(SQuerySQL *pQuerySql) {
if (pQuerySql == NULL) { if (pQuerySql == NULL) {
return; return;
......
...@@ -365,7 +365,7 @@ void tMemBucketDestroy(tMemBucket *pBucket) { ...@@ -365,7 +365,7 @@ void tMemBucketDestroy(tMemBucket *pBucket) {
taosTFree(pBucket); taosTFree(pBucket);
} }
void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { void tMemBucketUpdateBoundingBox(MinMaxEntry *r, const char *data, int32_t dataType) {
switch (dataType) { switch (dataType) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
int32_t val = *(int32_t *)data; int32_t val = *(int32_t *)data;
......
...@@ -1247,7 +1247,10 @@ _bi_consumer_fn_t tGetBiConsumerFn(int32_t leftType, int32_t rightType, int32_t ...@@ -1247,7 +1247,10 @@ _bi_consumer_fn_t tGetBiConsumerFn(int32_t leftType, int32_t rightType, int32_t
case TSDB_BINARY_OP_REMAINDER: case TSDB_BINARY_OP_REMAINDER:
return rem_function_arraylist[leftType][rightType]; return rem_function_arraylist[leftType][rightType];
default: default:
assert(0);
return NULL; return NULL;
} }
assert(0);
return NULL; return NULL;
} }
...@@ -412,7 +412,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) { ...@@ -412,7 +412,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
rpcLockConn(pConn); rpcLockConn(pConn);
if ( pConn->inType == 0 || pConn->user[0] == 0 ) { if ( pConn->inType == 0 || pConn->user[0] == 0 ) {
tDebug("%s, connection is already released, rsp wont be sent", pConn->info); tError("%s, connection is already released, rsp wont be sent", pConn->info);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
rpcDecRef(pRpc); rpcDecRef(pRpc);
......
...@@ -239,7 +239,7 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) { ...@@ -239,7 +239,7 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
return NULL; return NULL;
} }
if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->sid)) < 0) goto _err; if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->tid)) < 0) goto _err;
if (tdInitTSchemaBuilder(&schemaBuilder, htonl(pMsg->sversion)) < 0) { if (tdInitTSchemaBuilder(&schemaBuilder, htonl(pMsg->sversion)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; goto _err;
......
...@@ -14,7 +14,8 @@ ...@@ -14,7 +14,8 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
//#include <dnode.h> #define _NON_BLOCKING_RETRIEVE 0
#include "os.h" #include "os.h"
#include "tglobal.h" #include "tglobal.h"
...@@ -92,12 +93,12 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pRead) { ...@@ -92,12 +93,12 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pRead) {
} }
} }
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) { static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle; pRead->pCont = qhandle;
pRead->contLen = 0; pRead->contLen = 0;
pRead->rpcMsg.handle = NULL; pRead->rpcMsg.ahandle = ahandle;
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
...@@ -105,14 +106,23 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) { ...@@ -105,14 +106,23 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) {
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
} }
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, bool *freeHandle) { /**
*
* @param pRet response message object
* @param pVnode the vnode object
* @param handle qhandle for executing query
* @param freeHandle free qhandle or not
* @param ahandle sqlObj address at client side
* @return
*/
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, bool *freeHandle, void *ahandle) {
bool continueExec = false; bool continueExec = false;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if (continueExec) { if (continueExec) {
*freeHandle = false; *freeHandle = false;
vnodePutItemIntoReadQueue(pVnode, handle); vnodePutItemIntoReadQueue(pVnode, handle, ahandle);
pRet->qhandle = *handle; pRet->qhandle = *handle;
} else { } else {
*freeHandle = true; *freeHandle = true;
...@@ -190,7 +200,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -190,7 +200,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// current connect is broken // current connect is broken
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo); handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo);
if (handle == NULL) { // failed to register qhandle, todo add error test case if (handle == NULL) { // failed to register qhandle
pRsp->code = terrno; pRsp->code = terrno;
terrno = 0; terrno = 0;
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
...@@ -216,7 +226,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -216,7 +226,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (handle != NULL) { if (handle != NULL) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle); vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
vnodePutItemIntoReadQueue(pVnode, handle); vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcMsg.ahandle);
} }
} else { } else {
assert(pCont != NULL); assert(pCont != NULL);
...@@ -224,6 +234,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -224,6 +234,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle); vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle);
#if _NON_BLOCKING_RETRIEVE
bool freehandle = false; bool freehandle = false;
bool buildRes = qTableQuery(*qhandle); // do execute query bool buildRes = qTableQuery(*qhandle); // do execute query
...@@ -237,11 +249,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -237,11 +249,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pReadMsg->rpcMsg.handle); pReadMsg->rpcMsg.handle);
// set the real rsp error code // set the real rsp error code
pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle); pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle, pReadMsg->rpcMsg.ahandle);
// NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
code = TSDB_CODE_QRY_HAS_RSP; code = TSDB_CODE_QRY_HAS_RSP;
} else { } else {
void* h1 = qGetResultRetrieveMsg(*qhandle);
assert(h1 == NULL);
freehandle = qQueryCompleted(*qhandle); freehandle = qQueryCompleted(*qhandle);
} }
...@@ -250,6 +265,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -250,6 +265,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (freehandle || (!buildRes)) { if (freehandle || (!buildRes)) {
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle); qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
} }
#else
qTableQuery(*qhandle); // do execute query
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, false);
#endif
} }
return code; return code;
...@@ -309,12 +328,18 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -309,12 +328,18 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
freeHandle = true; freeHandle = true;
} else { // result is not ready, return immediately } else { // result is not ready, return immediately
assert(buildRes == true);
#if _NON_BLOCKING_RETRIEVE
if (!buildRes) { if (!buildRes) {
assert(pReadMsg->rpcMsg.handle != NULL);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false);
return TSDB_CODE_QRY_NOT_READY; return TSDB_CODE_QRY_NOT_READY;
} }
#endif
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle); // ahandle is the sqlObj pointer
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcMsg.ahandle);
} }
// If qhandle is not added into vread queue, the query should be completed already or paused with error. // If qhandle is not added into vread queue, the query should be completed already or paused with error.
......
...@@ -151,7 +151,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet ...@@ -151,7 +151,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
vDebug("vgId:%d, table:%s, start to drop", pVnode->vgId, pTable->tableId); vDebug("vgId:%d, table:%s, start to drop", pVnode->vgId, pTable->tableId);
STableId tableId = {.uid = htobe64(pTable->uid), .tid = htonl(pTable->sid)}; STableId tableId = {.uid = htobe64(pTable->uid), .tid = htonl(pTable->tid)};
if (tsdbDropTable(pVnode->tsdb, tableId) < 0) code = terrno; if (tsdbDropTable(pVnode->tsdb, tableId) < 0) code = terrno;
......
...@@ -91,4 +91,60 @@ endi ...@@ -91,4 +91,60 @@ endi
sql_error select max(c2*2) from $tb sql_error select max(c2*2) from $tb
sql_error select max(c1-c2) from $tb sql_error select max(c1-c2) from $tb
print =====================> td-1764
sql select sum(c1)/count(*), sum(c1) as b, count(*) as b from $stb interval(1y)
if $rows != 1 then
return -1
endi
if $data00 != @18-01-01 00:00:00.000@ then
return -1
endi
if $data01 != 2.250000000 then
return -1
endi
if $data02 != 225000 then
return -1
endi
sql select first(c1) - last(c1), first(c1) as b, last(c1) as b, min(c1) - max(c1), spread(c1) from ca_stb0 interval(1y)
if $rows != 1 then
return -1
endi
if $data00 != @18-01-01 00:00:00.000@ then
return -1
endi
if $data01 != -9.000000000 then
return -1
endi
if $data02 != 0 then
return -1
endi
if $data03 != 9 then
return -1
endi
if $data04 != -9.000000000 then
return -1
endi
if $data05 != 9.000000000 then
return -1
endi
sql_error select first(c1, c2) - last(c1, c2) from stb interval(1y)
sql_error select first(ts) - last(ts) from stb interval(1y)
sql_error select top(c1, 2) - last(c1) from stb;
sql_error select stddev(c1) - last(c1) from stb;
sql_error select diff(c1) - last(c1) from stb;
sql_error select first(c7) - last(c7) from stb;
sql_error select first(c8) - last(c8) from stb;
sql_error select first(c9) - last(c9) from stb;
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
...@@ -353,4 +353,36 @@ sql_error select from t1 ...@@ -353,4 +353,36 @@ sql_error select from t1
sql_error select abc from t1 sql_error select abc from t1
sql_error select abc as tu from t1 sql_error select abc as tu from t1
print ========================> td-1756
sql_error select * from t1 where ts>now-1y
sql_error select * from t1 where ts>now-1n
print ========================> td-1752
sql select * from db.st2 where t2 < 200 and t2 is not null;
if $rows != 1 then
return -1
endi
if $data00 != @19-12-09 16:27:35.000@ then
return -1
endi
if $data01 != 2 then
return -1
endi
if $data02 != 1 then
return -1
endi
sql select * from db.st2 where t2 > 200 or t2 is null;
if $rows != 0 then
return -1
endi
sql select * from st2 where t2 < 200 and t2 is null;
if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
...@@ -96,5 +96,10 @@ if $rows != 14 then ...@@ -96,5 +96,10 @@ if $rows != 14 then
return -1 return -1
endi endi
print ===============================> td-1765
sql create table m1(ts timestamp, k int) tags(a binary(4), b nchar(4));
sql create table tm0 using m1 tags('abcd', 'abcd');
sql_error alter table tm0 set tag b = 'abcd1';
sql_error alter table tm0 set tag a = 'abcd1';
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
...@@ -251,7 +251,7 @@ if $rows != 15 then ...@@ -251,7 +251,7 @@ if $rows != 15 then
endi endi
# first subclause are empty # first subclause are empty
sql select count(*) as c from union_tb0 where ts>now+10y union all select sum(c1) as c from union_tb1; sql select count(*) as c from union_tb0 where ts > now + 3650d union all select sum(c1) as c from union_tb1;
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
...@@ -346,7 +346,7 @@ if $data91 != 99 then ...@@ -346,7 +346,7 @@ if $data91 != 99 then
return -1 return -1
endi endi
#1111111111111111111111111111111111111111111111111 #=================================================================================================
# two aggregated functions for normal tables # two aggregated functions for normal tables
sql select sum(c1) as a from union_tb0 limit 1 union all select sum(c3) as a from union_tb1 limit 2; sql select sum(c1) as a from union_tb0 limit 1 union all select sum(c3) as a from union_tb1 limit 2;
if $rows != 2 then if $rows != 2 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册