未验证 提交 988b9447 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #10977 from taosdata/feature/3.0_liaohj

[td-13039] support nchar data type.
......@@ -159,6 +159,7 @@ typedef struct {
(IS_SIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
#define IS_CONVERT_AS_UNSIGNED(_t) (IS_UNSIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL))
// TODO remove this function
static FORCE_INLINE bool isNull(const void *val, int32_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
......
......@@ -84,6 +84,8 @@ typedef uint16_t VarDataLenT; // maxVarDataLen: 32767
#define varDataLen(v) ((VarDataLenT *)(v))[0]
#define varDataVal(v) ((char *)(v) + VARSTR_HEADER_SIZE)
#define NCHAR_WIDTH_TO_BYTES(n) ((n) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE)
typedef int32_t VarDataOffsetT;
typedef struct tstr {
......
......@@ -155,6 +155,7 @@ typedef struct SReqResultInfo {
TAOS_FIELD* fields;
uint32_t numOfCols;
int32_t* length;
char** convertBuf;
TAOS_ROW row;
SResultColumn* pCol;
uint32_t numOfRows;
......
......@@ -169,6 +169,13 @@ static void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
tfree(pResInfo->row);
tfree(pResInfo->pCol);
tfree(pResInfo->fields);
if (pResInfo->convertBuf != NULL) {
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
tfree(pResInfo->convertBuf[i]);
}
tfree(pResInfo->convertBuf);
}
}
static void doDestroyRequest(void *p) {
......
......@@ -634,18 +634,30 @@ _return:
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
SResultColumn* pCol = &pResultInfo->pCol[i];
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
int32_t type = pResultInfo->fields[i].type;
int32_t bytes = pResultInfo->fields[i].bytes;
if (IS_VAR_DATA_TYPE(type)) {
if (pCol->offset[pResultInfo->current] != -1) {
char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
pResultInfo->length[i] = varDataLen(pStart);
pResultInfo->row[i] = varDataVal(pStart);
if (type == TSDB_DATA_TYPE_NCHAR) {
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(pResultInfo->convertBuf[i]));
ASSERT(len <= bytes);
pResultInfo->row[i] = varDataVal(pResultInfo->convertBuf[i]);
varDataSetLen(pResultInfo->convertBuf[i], len);
pResultInfo->length[i] = len;
}
} else {
pResultInfo->row[i] = NULL;
}
} else {
if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
pResultInfo->row[i] = pResultInfo->pCol[i].pData + pResultInfo->fields[i].bytes * pResultInfo->current;
pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
} else {
pResultInfo->row[i] = NULL;
}
......@@ -661,13 +673,20 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->pCol = calloc(pResInfo->numOfCols, sizeof(SResultColumn));
pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
}
pResInfo->convertBuf = calloc(pResInfo->numOfCols, POINTER_BYTES);
if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL) {
if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
} else {
return TSDB_CODE_SUCCESS;
}
for(int32_t i = 0; i < pResInfo->numOfCols; ++i) {
if(pResInfo->fields[i].type == TSDB_DATA_TYPE_NCHAR) {
pResInfo->convertBuf[i] = calloc(1, NCHAR_WIDTH_TO_BYTES(pResInfo->fields[i].bytes));
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
......
......@@ -52,7 +52,7 @@ TEST(testCase, driverInit_Test) {
// taosInitGlobalCfg();
// taos_init();
}
#if 0
TEST(testCase, connect_Test) {
// taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg");
......@@ -652,6 +652,7 @@ TEST(testCase, projection_query_stables) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -660,7 +661,7 @@ TEST(testCase, agg_query_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "select count(*), sum(k),min(k),max(k) from tu");
pRes = taos_query(pConn, "select k from tm0");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
......
......@@ -7086,10 +7086,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SResultInfo* pResultInfo, bool* newgroup, SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, TASK_COMPLETED)? pTaskInfo->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
int64_t ekey = Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED)? pTaskInfo->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity, pInfo->p);
......@@ -7097,7 +7097,7 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SResult
*newgroup = true;
}
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SResultInfo *pResultInfo, bool *newgroup) {
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SResultInfo *pResultInfo, bool *newgroup, SExecTaskInfo* pTaskInfo) {
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
*newgroup = false;
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity, pInfo->p);
......@@ -7108,12 +7108,13 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SResultInf
// handle the cached new group data block
if (pInfo->existNewGroupBlock) {
// doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup);
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
}
}
static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
SFillOperatorInfo *pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SResultInfo* pResultInfo = &pOperator->resultInfo;
blockDataCleanup(pInfo->pRes);
......@@ -7121,7 +7122,7 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
return NULL;
}
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup);
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
return pInfo->pRes;
}
......@@ -7142,7 +7143,7 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
// Fill the previous group data block, before handle the data block of new group.
// Close the fill operation for previous group data block
// taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
taosFillSetStartInfo(pInfo->pFillInfo, 0, pTaskInfo->window.ekey);
} else {
if (pBlock == NULL) {
if (pInfo->totalInputRows == 0) {
......@@ -7150,7 +7151,7 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
return NULL;
}
// taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
taosFillSetStartInfo(pInfo->pFillInfo, 0, pTaskInfo->window.ekey);
} else {
pInfo->totalInputRows += pBlock->info.rows;
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
......@@ -7168,14 +7169,13 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
return pInfo->pRes;
}
// doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
if (pInfo->pRes->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) {
return pInfo->pRes;
}
} else if (pInfo->existNewGroupBlock) { // try next group
assert(pBlock != NULL);
// doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup);
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
if (pInfo->pRes->info.rows > pResultInfo->threshold) {
return pInfo->pRes;
}
......@@ -7863,10 +7863,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
pInfo->intervalInfo = *pInterval;
SResultInfo* pResultInfo = &pOperator->resultInfo;
// int32_t code = initFillInfo(pInfo, pExpr, numOfCols, fillVal, , pResultInfo->capacity, pTaskInfo->id.str, pInterval, fillType);
// if (code != TSDB_CODE_SUCCESS) {
// goto _error;
// }
int32_t code = initFillInfo(pInfo, pExpr, numOfCols, (int64_t*) fillVal, pTaskInfo->window, pResultInfo->capacity, pTaskInfo->id.str, pInterval, fillType);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->name = "FillOperator";
pOperator->blockingOptr = false;
......@@ -7881,7 +7881,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
pOperator->closeFn = destroySFillOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
......
......@@ -48,11 +48,6 @@
} \
} while (0)
enum {
TSDB_USE_SERVER_TS = 0,
TSDB_USE_CLI_TS = 1,
};
typedef struct SInsertParseContext {
SParseContext* pComCxt; // input
char *pSql; // input
......@@ -303,20 +298,7 @@ static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start)
}
TSKEY k = *(TSKEY *)start;
if (k == INT64_MIN) {
if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
return TSDB_CODE_FAILED; // client time/server time can not be mixed
}
pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
} else {
if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
return TSDB_CODE_FAILED; // client time/server time can not be mixed
}
pDataBlocks->tsSource = TSDB_USE_CLI_TS;
}
if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
if (k <= pDataBlocks->prevTS) {
pDataBlocks->ordered = false;
}
......
......@@ -141,7 +141,6 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
dataBuf->prevTS = INT64_MIN;
dataBuf->rowSize = rowSize;
dataBuf->size = startOffset;
dataBuf->tsSource = -1;
dataBuf->vgId = dataBuf->pTableMeta->vgId;
assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
......
......@@ -52,7 +52,11 @@ protected:
const string syntaxTreeStr = toString(query_->pRoot, false);
SLogicNode* pLogicNode = nullptr;
SPlanContext cxt = { .queryId = 1, .acctId = 0, .streamQuery = streamQuery };
SPlanContext cxt = {0};
cxt.queryId = 1;
cxt.acctId = 0;
cxt.streamQuery = streamQuery;
setPlanContext(query_, &cxt);
code = createLogicPlan(&cxt, &pLogicNode);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -37,8 +37,8 @@ while $dbCnt < 2
print =============== create super table, include all type
print notes: after nchar show ok, modify binary to nchar
sql create table `stable` (`timestamp` timestamp, `int` int, `binary` binary(16), `nchar` binary(16)) tags (`float` float, `Binary` binary(16), `Nchar` nchar(16))
sql create table `Stable` (`timestamp` timestamp, `int` int, `Binary` binary(32), `Nchar` binary(32)) tags (`float` float, `binary` binary(16), `nchar` nchar(16))
sql create table `stable` (`timestamp` timestamp, `int` int, `binary` binary(16), `nchar` nchar(16)) tags (`float` float, `Binary` binary(16), `Nchar` nchar(16))
sql create table `Stable` (`timestamp` timestamp, `int` int, `Binary` binary(32), `Nchar` nchar(32)) tags (`float` float, `binary` binary(16), `nchar` nchar(16))
sql show stables
print rows: $rows
......@@ -96,6 +96,7 @@ while $dbCnt < 2
return -1
endi
if $data03 != table then
print expect table, actual $data03
return -1
endi
......@@ -173,6 +174,10 @@ while $dbCnt < 2
#if $rows != 4 then
# return -1
#endi
##sql select * from st
##if $rows != 4 then
## return -1
##endi
endw
......
Subproject commit 8145dd1713ab9e7652ea621ca7e6895fd0b21d65
Subproject commit f36b07f710d661dca88fdd70e73b5e3e16a960e0
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册