提交 e016a975 编写于 作者: H Haojun Liao

[td-90]

上级 1273abc2
...@@ -364,7 +364,7 @@ char tTokenTypeSwitcher[13] = { ...@@ -364,7 +364,7 @@ char tTokenTypeSwitcher[13] = {
}; };
bool isValidDataType(int32_t type, int32_t length) { bool isValidDataType(int32_t type, int32_t length) {
if (type < TSDB_DATA_TYPE_BOOL || type > TSDB_DATA_TYPE_NCHAR) { if (type < TSDB_DATA_TYPE_NULL || type > TSDB_DATA_TYPE_NCHAR) {
return false; return false;
} }
......
...@@ -109,8 +109,8 @@ int tsdbTableSetSName(STableCfg *config, char *sname, bool dup); ...@@ -109,8 +109,8 @@ int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup); int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup);
void tsdbClearTableCfg(STableCfg *config); void tsdbClearTableCfg(STableCfg *config);
int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId *id, int32_t colId, int16_t *type, int16_t *bytes, char **val); void* tsdbGetTableTagVal(TsdbRepoT* repo, const STableId* id, int32_t colId, int16_t type, int16_t bytes);
char * tsdbGetTableName(TsdbRepoT *repo, const STableId *id, int16_t *bytes); char* tsdbGetTableName(TsdbRepoT *repo, const STableId *id);
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg); int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg);
......
...@@ -2224,24 +2224,26 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2224,24 +2224,26 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
* set tag value in SQLFunctionCtx * set tag value in SQLFunctionCtx
* e.g.,tag information into input buffer * e.g.,tag information into input buffer
*/ */
static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColId, tVariant *param) { static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColId, tVariant *tag, int16_t type,
tVariantDestroy(param); int16_t bytes) {
tVariantDestroy(tag);
char * val = NULL;
int16_t bytes = 0;
int16_t type = 0;
if (tagColId == TSDB_TBNAME_COLUMN_INDEX) { if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
val = tsdbGetTableName(tsdb, pTableId, &bytes); char* val = tsdbGetTableName(tsdb, pTableId);
type = TSDB_DATA_TYPE_BINARY; assert(val != NULL);
tVariantCreateFromBinary(param, varDataVal(val), varDataLen(val), type);
tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), TSDB_DATA_TYPE_BINARY);
} else { } else {
tsdbGetTableTagVal(tsdb, pTableId, tagColId, &type, &bytes, &val); char* val = tsdbGetTableTagVal(tsdb, pTableId, tagColId, type, bytes);
if (val == NULL) {
tag->nType = TSDB_DATA_TYPE_NULL;
return;
}
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
tVariantCreateFromBinary(param, varDataVal(val), varDataLen(val), type); tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), type);
} else { } else {
tVariantCreateFromBinary(param, val, bytes, type); tVariantCreateFromBinary(tag, val, bytes, type);
} }
} }
} }
...@@ -2249,25 +2251,29 @@ static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColI ...@@ -2249,25 +2251,29 @@ static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColI
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId* pTableId, void *tsdb) { void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId* pTableId, void *tsdb) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
SSqlFuncMsg *pFuncMsg = &pQuery->pSelectExpr[0].base; SExprInfo *pExprInfo = &pQuery->pSelectExpr[0];
if (pQuery->numOfOutput == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) { if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
assert(pFuncMsg->numOfParams == 1);
doSetTagValueInParam(tsdb, pTableId, pFuncMsg->arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag); assert(pExprInfo->base.numOfParams == 1);
doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag,
pExprInfo->type, pExprInfo->bytes);
} else { } else {
// set tag value, by which the results are aggregated. // set tag value, by which the results are aggregated.
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) { for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
SColIndex *pCol = &pQuery->pSelectExpr[idx].base.colInfo; SExprInfo* pExprInfo = &pQuery->pSelectExpr[idx];
// ts_comp column required the tag value for join filter // ts_comp column required the tag value for join filter
if (!TSDB_COL_IS_TAG(pCol->flag)) { if (!TSDB_COL_IS_TAG(pExprInfo->base.colInfo.flag)) {
continue; continue;
} }
// todo use tag column index to optimize performance // todo use tag column index to optimize performance
doSetTagValueInParam(tsdb, pTableId, pCol->colId, &pRuntimeEnv->pCtx[idx].tag); doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.colInfo.colId, &pRuntimeEnv->pCtx[idx].tag,
pExprInfo->type, pExprInfo->bytes);
} }
// set the join tag for first column // set the join tag for first column
SSqlFuncMsg *pFuncMsg = &pExprInfo->base;
if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
pRuntimeEnv->pTSBuf != NULL) { pRuntimeEnv->pTSBuf != NULL) {
assert(pFuncMsg->numOfParams == 1); assert(pFuncMsg->numOfParams == 1);
...@@ -6005,7 +6011,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) { ...@@ -6005,7 +6011,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
num = taosArrayGetSize(pa); num = taosArrayGetSize(pa);
assert(num == pQInfo->groupInfo.numOfTables); assert(num == pQInfo->groupInfo.numOfTables);
int16_t type, bytes; // int16_t type, bytes;
int32_t functionId = pQuery->pSelectExpr[0].base.functionId; int32_t functionId = pQuery->pSelectExpr[0].base.functionId;
if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
...@@ -6013,7 +6019,6 @@ static void buildTagQueryResult(SQInfo* pQInfo) { ...@@ -6013,7 +6019,6 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
SExprInfo* pExprInfo = &pQuery->pSelectExpr[0]; SExprInfo* pExprInfo = &pQuery->pSelectExpr[0];
int32_t rsize = pExprInfo->bytes; int32_t rsize = pExprInfo->bytes;
char* data = NULL;
for(int32_t i = 0; i < num; ++i) { for(int32_t i = 0; i < num; ++i) {
SGroupItem* item = taosArrayGet(pa, i); SGroupItem* item = taosArrayGet(pa, i);
...@@ -6031,8 +6036,25 @@ static void buildTagQueryResult(SQInfo* pQInfo) { ...@@ -6031,8 +6036,25 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
*(int32_t*) output = pQInfo->vgId; *(int32_t*) output = pQInfo->vgId;
output += sizeof(pQInfo->vgId); output += sizeof(pQInfo->vgId);
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo->base.colInfo.colId, &type, &bytes, &data); int16_t bytes = pExprInfo->bytes;
memcpy(output, data, bytes); int16_t type = pExprInfo->type;
char* val = tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo->base.colInfo.colId, type, bytes);
// todo refactor
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
if (val == NULL) {
setVardataNull(output, type);
} else {
memcpy(output, val, varDataTLen(val));
}
} else {
if (val == NULL) {
setNull(output, type, bytes);
} else {
memcpy(output, val, bytes);
}
}
} }
qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, num); qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, num);
...@@ -6041,17 +6063,18 @@ static void buildTagQueryResult(SQInfo* pQInfo) { ...@@ -6041,17 +6063,18 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
SExprInfo* pExprInfo = pQuery->pSelectExpr; SExprInfo* pExprInfo = pQuery->pSelectExpr;
SGroupItem* item = taosArrayGet(pa, i); SGroupItem* item = taosArrayGet(pa, i);
char* data = NULL;
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) { for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
// todo check the return value, refactor codes // todo check the return value, refactor codes
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
data = tsdbGetTableName(pQInfo->tsdb, &item->id, &bytes); char* data = tsdbGetTableName(pQInfo->tsdb, &item->id);
char* dst = pQuery->sdata[j]->data + i * (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE); char* dst = pQuery->sdata[j]->data + i * (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
memcpy(dst, data, varDataTLen(data)); memcpy(dst, data, varDataTLen(data));
} else {// todo refactor, return the true length of binary|nchar data } else {// todo refactor
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, &type, &bytes, &data); int16_t type = pExprInfo[j].type;
assert(bytes <= pExprInfo[j].bytes && type == pExprInfo[j].type); int16_t bytes = pExprInfo[j].bytes;
char* data = tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, type, bytes);
char* dst = pQuery->sdata[j]->data + i * pExprInfo[j].bytes; char* dst = pQuery->sdata[j]->data + i * pExprInfo[j].bytes;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
......
...@@ -245,45 +245,33 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) { ...@@ -245,45 +245,33 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
} }
} }
int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t* type, int16_t* bytes, char** val) { void* tsdbGetTableTagVal(TsdbRepoT* repo, const STableId* id, int32_t colId, int16_t type, int16_t bytes) {
STsdbMeta* pMeta = tsdbGetMeta(repo); STsdbMeta* pMeta = tsdbGetMeta(repo);
STable* pTable = tsdbGetTableByUid(pMeta, id->uid); STable* pTable = tsdbGetTableByUid(pMeta, id->uid);
STSchema *pSchema = tsdbGetTableTagSchema(pMeta, pTable); STSchema *pSchema = tsdbGetTableTagSchema(pMeta, pTable);
STColumn *pCol = tdGetColOfID(pSchema, colId); STColumn *pCol = tdGetColOfID(pSchema, colId);
if (pCol == NULL) { if (pCol == NULL) {
return -1; // No matched tag volumn return NULL; // No matched tag volumn
} }
*val = tdGetKVRowValOfCol(pTable->tagVal, colId); char* val = tdGetKVRowValOfCol(pTable->tagVal, colId);
*type = pCol->type; assert(type == pCol->type && bytes == pCol->bytes);
if (*val != NULL) { if (val != NULL && IS_VAR_DATA_TYPE(type)) {
if (IS_VAR_DATA_TYPE(*type)) { assert(varDataLen(val) < pCol->bytes);
*bytes = varDataLen(*val);
} else {
*bytes = TYPE_BYTES[*type];
}
} }
return TSDB_CODE_SUCCESS; return val;
} }
char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes) { char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id) {
STsdbMeta* pMeta = tsdbGetMeta(repo); STsdbMeta* pMeta = tsdbGetMeta(repo);
STable* pTable = tsdbGetTableByUid(pMeta, id->uid); STable* pTable = tsdbGetTableByUid(pMeta, id->uid);
if (pTable == NULL) { if (pTable == NULL) {
if (bytes != NULL) {
*bytes = 0;
}
return NULL; return NULL;
} else { } else {
if (bytes != NULL) {
*bytes = varDataLen(pTable->name);
}
return (char*) pTable->name; return (char*) pTable->name;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册