提交 90dff224 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/hzcheng_3.0

......@@ -23,6 +23,8 @@
#include "tmsgtype.h"
#include "tpagedbuf.h"
#include "tref.h"
#include "cJSON.h"
#include "tdataformat.h"
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
......@@ -268,7 +270,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
} else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
} else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
}
......@@ -803,6 +805,101 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
return TSDB_CODE_SUCCESS;
}
static char* parseTagDatatoJson(void *p){
char* string = NULL;
cJSON *json = cJSON_CreateObject();
if (json == NULL)
{
goto end;
}
int16_t nCols = kvRowNCols(p);
char tagJsonKey[256] = {0};
for (int j = 0; j < nCols; ++j) {
SColIdx * pColIdx = kvRowColIdxAt(p, j);
char* val = (char*)(kvRowColVal(p, pColIdx));
if (j == 0){
if(*val == TSDB_DATA_TYPE_NULL){
string = taosMemoryCalloc(1, 8);
sprintf(varDataVal(string), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(string, strlen(varDataVal(string)));
goto end;
}
continue;
}
// json key encode by binary
memset(tagJsonKey, 0, sizeof(tagJsonKey));
memcpy(tagJsonKey, varDataVal(val), varDataLen(val));
// json value
val += varDataTLen(val);
char* realData = POINTER_SHIFT(val, CHAR_BYTES);
char type = *val;
if(type == TSDB_DATA_TYPE_NULL) {
cJSON* value = cJSON_CreateNull();
if (value == NULL)
{
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
}else if(type == TSDB_DATA_TYPE_NCHAR) {
cJSON* value = NULL;
if (varDataLen(realData) > 0){
char *tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1);
int32_t length = taosUcs4ToMbs((TdUcs4 *)varDataVal(realData), varDataLen(realData), tagJsonValue);
if (length < 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, val);
taosMemoryFree(tagJsonValue);
goto end;
}
value = cJSON_CreateString(tagJsonValue);
taosMemoryFree(tagJsonValue);
if (value == NULL)
{
goto end;
}
}else if(varDataLen(realData) == 0){
value = cJSON_CreateString("");
}else{
ASSERT(0);
}
cJSON_AddItemToObject(json, tagJsonKey, value);
}else if(type == TSDB_DATA_TYPE_DOUBLE){
double jsonVd = *(double*)(realData);
cJSON* value = cJSON_CreateNumber(jsonVd);
if (value == NULL)
{
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
// }else if(type == TSDB_DATA_TYPE_BIGINT){
// int64_t jsonVd = *(int64_t*)(realData);
// cJSON* value = cJSON_CreateNumber((double)jsonVd);
// if (value == NULL)
// {
// goto end;
// }
// cJSON_AddItemToObject(json, tagJsonKey, value);
}else if (type == TSDB_DATA_TYPE_BOOL) {
char jsonVd = *(char*)(realData);
cJSON* value = cJSON_CreateBool(jsonVd);
if (value == NULL)
{
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
}else{
ASSERT(0);
}
}
string = cJSON_PrintUnformatted(json);
end:
cJSON_Delete(json);
return string;
}
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t type = pResultInfo->fields[i].type;
......@@ -833,9 +930,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
pResultInfo->row[i] = pResultInfo->pCol[i].pData;
}
if (type == TSDB_DATA_TYPE_JSON) {
}else if (type == TSDB_DATA_TYPE_JSON && colLength[i] > 0) {
char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -848,6 +943,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
if (pCol->offset[j] != -1) {
char* pStart = pCol->offset[j] + pCol->pData;
int32_t jsonInnerType = *pStart;
char* jsonInnerData = pStart + CHAR_BYTES;
char dst[TSDB_MAX_JSON_TAG_LEN] = {0};
......@@ -855,15 +951,9 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
int32_t length =
taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst));
if (length <= 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
varDataVal(jsonInnerData));
length = 0;
}
varDataSetLen(dst, length);
char *jsonString = parseTagDatatoJson(jsonInnerData);
STR_TO_VARSTR(dst, jsonString);
taosMemoryFree(jsonString);
} else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
*(char*)varDataVal(dst) = '\"';
int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
......
......@@ -122,10 +122,14 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES);
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = LONG_BYTES;
} else if (*pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = DOUBLE_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_JSON) {
dataLen = kvRowLen(pData + CHAR_BYTES);
} else {
ASSERT(0);
}
dataLen += CHAR_BYTES;
}
......
......@@ -40,11 +40,11 @@ bool tsPrintAuth = false;
// multi process
int32_t tsMultiProcess = 0;
int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 128;
int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 128;
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 1024;
int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 1024;
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
int32_t tsNumOfShmThreads = 1;
// queue & threads
......@@ -380,11 +380,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2;
......
......@@ -4207,7 +4207,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->pScalarExprInfo = pScalarExprInfo;
pInfo->numOfScalarExpr = numOfScalarExpr;
if (pInfo->pScalarExprInfo != NULL) {
pInfo->pScalarCtx = createSqlFunctionCtx(pScalarExprInfo, numOfCols, &pInfo->rowCellInfoOffset);
pInfo->pScalarCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset);
}
pOperator->name = "TableAggregate";
......
......@@ -300,10 +300,26 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
if (fmIsScanPseudoColumnFunc(functionId)) {
setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
} else { // these are tags
const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
const char* p = NULL;
if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
const uint8_t *tmp = mr.me.ctbEntry.pTags;
char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
if(data == NULL){
qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
return;
}
*data = TSDB_DATA_TYPE_JSON;
memcpy(data+1, tmp, kvRowLen(tmp));
p = data;
}else{
p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
}
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, p, (p == NULL));
}
if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
taosMemoryFree((void*)p);
}
}
}
......@@ -1587,8 +1603,21 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
STR_TO_VARSTR(str, mr.me.name);
colDataAppend(pDst, count, str, false);
} else { // it is a tag value
const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId);
colDataAppend(pDst, count, p, (p == NULL));
if(pDst->info.type == TSDB_DATA_TYPE_JSON){
const uint8_t *tmp = mr.me.ctbEntry.pTags;
char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
if(data == NULL){
qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
return NULL;
}
*data = TSDB_DATA_TYPE_JSON;
memcpy(data+1, tmp, kvRowLen(tmp));
colDataAppend(pDst, count, data, false);
taosMemoryFree(data);
}else{
const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId);
colDataAppend(pDst, count, p, (p == NULL));
}
}
}
......
......@@ -365,8 +365,8 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p
if (keyLen == 0 || taosHashGet(keyHash, jsonKey, keyLen) != NULL) {
continue;
}
// key: keyLen + VARSTR_HEADER_SIZE, value type: CHAR_BYTES, value reserved: LONG_BYTES
tagKV = taosMemoryCalloc(keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + LONG_BYTES, 1);
// key: keyLen + VARSTR_HEADER_SIZE, value type: CHAR_BYTES, value reserved: DOUBLE_BYTES
tagKV = taosMemoryCalloc(keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + DOUBLE_BYTES, 1);
if (!tagKV) {
retCode = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end;
......@@ -411,13 +411,9 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p
}
char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE);
char* valueData = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES);
*valueType =
(item->valuedouble - (int64_t)(item->valuedouble) == 0) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_DOUBLE;
if (*valueType == TSDB_DATA_TYPE_DOUBLE)
*((double*)valueData) = item->valuedouble;
else if (*valueType == TSDB_DATA_TYPE_BIGINT)
*((int64_t*)valueData) = item->valueint;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + LONG_BYTES);
*valueType = TSDB_DATA_TYPE_DOUBLE;
*((double*)valueData) = item->valuedouble;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + DOUBLE_BYTES);
} else if (item->type == cJSON_True || item->type == cJSON_False) {
char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE);
char* valueData = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES);
......
......@@ -899,7 +899,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
}
int32_t code = 0;
SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst->param};
SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst ? pDst->param : NULL};
// TODO: OPT performance
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
......
......@@ -922,7 +922,8 @@ static void doReleaseVec(SColumnInfoData* pCol, int32_t type) {
}
}
char *getJsonValue(char *json, char *key){ //todo
char *getJsonValue(char *json, char *key){ //todo
json++; // jump type
int16_t cols = kvRowNCols(json);
for (int i = 0; i < cols; ++i) {
SColIdx *pColIdx = kvRowColIdxAt(json, i);
......
......@@ -1035,7 +1035,7 @@ void makeJsonArrow(SSDataBlock **src, SNode **opNode, void *json, char *key){
SNode *pLeft = NULL, *pRight = NULL;
scltMakeValueNode(&pRight, TSDB_DATA_TYPE_BINARY, keyVar);
scltMakeColumnNode(&pLeft, src, TSDB_DATA_TYPE_JSON, varDataLen(json), 1, json);
scltMakeColumnNode(&pLeft, src, TSDB_DATA_TYPE_JSON, kvRowLen(json), 1, json);
scltMakeOpNode(opNode, OP_TYPE_JSON_GET_VALUE, TSDB_DATA_TYPE_JSON, pLeft, pRight);
}
......@@ -1088,18 +1088,17 @@ void makeCalculate(void *json, void *key, int32_t rightType, void *rightData, do
}else if(opType == OP_TYPE_ADD || opType == OP_TYPE_SUB || opType == OP_TYPE_MULTI || opType == OP_TYPE_DIV ||
opType == OP_TYPE_MOD || opType == OP_TYPE_MINUS){
double tmp = *((double *)colDataGetData(column, 0));
ASSERT_TRUE(tmp == exceptValue);
printf("result:%lf\n", tmp);
printf("1result:%f,except:%f\n", *((double *)colDataGetData(column, 0)), exceptValue);
ASSERT_TRUE(abs(*((double *)colDataGetData(column, 0)) - exceptValue) < 1e-15);
}else if(opType == OP_TYPE_BIT_AND || opType == OP_TYPE_BIT_OR){
printf("2result:%ld,except:%f\n", *((int64_t *)colDataGetData(column, 0)), exceptValue);
ASSERT_EQ(*((int64_t *)colDataGetData(column, 0)), exceptValue);
printf("result:%ld\n", *((int64_t *)colDataGetData(column, 0)));
}else if(opType == OP_TYPE_GREATER_THAN || opType == OP_TYPE_GREATER_EQUAL || opType == OP_TYPE_LOWER_THAN ||
opType == OP_TYPE_LOWER_EQUAL || opType == OP_TYPE_EQUAL || opType == OP_TYPE_NOT_EQUAL ||
opType == OP_TYPE_IS_NULL || opType == OP_TYPE_IS_NOT_NULL || opType == OP_TYPE_IS_TRUE ||
opType == OP_TYPE_LIKE || opType == OP_TYPE_NOT_LIKE || opType == OP_TYPE_MATCH || opType == OP_TYPE_NMATCH){
printf("3result:%d,except:%f\n", *((bool *)colDataGetData(column, 0)), exceptValue);
ASSERT_EQ(*((bool *)colDataGetData(column, 0)), exceptValue);
printf("result:%d\n", *((bool *)colDataGetData(column, 0)));
}
taosArrayDestroyEx(blockList, scltFreeDataBlock);
......@@ -1114,6 +1113,13 @@ TEST(columnTest, json_column_arith_op) {
tdInitKVRowBuilder(&kvRowBuilder);
parseJsontoTagData(rightv, &kvRowBuilder, NULL, 0);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
char *tmp = (char *)taosMemoryRealloc(row, kvRowLen(row)+1);
if(tmp == NULL){
ASSERT_TRUE(0);
}
memmove(tmp+1, tmp, kvRowLen(tmp));
*tmp = TSDB_DATA_TYPE_JSON;
row = tmp;
const int32_t len = 8;
EOperatorType op[len] = {OP_TYPE_ADD, OP_TYPE_SUB, OP_TYPE_MULTI, OP_TYPE_DIV,
......@@ -1166,6 +1172,9 @@ TEST(columnTest, json_column_arith_op) {
for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes5[i], op[i]);
}
tdDestroyKVRowBuilder(&kvRowBuilder);
taosMemoryFree(row);
}
void *prepareNchar(char* rightData){
......@@ -1186,6 +1195,13 @@ TEST(columnTest, json_column_logic_op) {
tdInitKVRowBuilder(&kvRowBuilder);
parseJsontoTagData(rightv, &kvRowBuilder, NULL, 0);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
char *tmp = (char *)taosMemoryRealloc(row, kvRowLen(row)+1);
if(tmp == NULL){
ASSERT_TRUE(0);
}
memmove(tmp+1, tmp, kvRowLen(tmp));
*tmp = TSDB_DATA_TYPE_JSON;
row = tmp;
const int32_t len = 9;
const int32_t len1 = 4;
......@@ -1223,7 +1239,7 @@ TEST(columnTest, json_column_logic_op) {
printf("--------------------json null---------------------\n");
key = "k3";
double eRes2[len+len1] = {DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, true, false, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX};
bool eRes2[len+len1] = {false, false, false, false, false, false, true, false, false, false, false, false, false};
for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes2[i], op[i]);
}
......@@ -1262,7 +1278,7 @@ TEST(columnTest, json_column_logic_op) {
printf("--------------------json double---------------------\n");
key = "k6";
bool eRes5[len+len1] = {true, false, false, false, false, true, false, true, true, false, false, false, true};
bool eRes5[len+len1] = {true, false, false, false, false, true, false, true, true, false, true, false, true};
for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes5[i], op[i]);
}
......@@ -1275,7 +1291,7 @@ TEST(columnTest, json_column_logic_op) {
printf("---------------------json not exist--------------------\n");
key = "k10";
double eRes10[len+len1] = {DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, true, false, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX};
double eRes10[len+len1] = {false, false, false, false, false, false, true, false, false, false, false, false, false};
for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes10[i], op[i]);
}
......@@ -1284,6 +1300,9 @@ TEST(columnTest, json_column_logic_op) {
makeCalculate(row, key, TSDB_DATA_TYPE_NCHAR, rightData, eRes10[i], op[i]);
taosMemoryFree(rightData);
}
tdDestroyKVRowBuilder(&kvRowBuilder);
taosMemoryFree(row);
}
TEST(columnTest, smallint_value_add_int_column) {
......
......@@ -1079,6 +1079,291 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 10 end ...... ")
def tmqCase11(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 11: ")
self.initConsumerTable()
# create and start thread
parameterDict = {'cfg': '', \
'actionType': 0, \
'dbName': 'db11', \
'dropFlag': 1, \
'vgroups': 4, \
'replica': 1, \
'stbName': 'stb1', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
self.insert_data(tdSql,\
parameterDict["dbName"],\
parameterDict["stbName"],\
parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],\
parameterDict["batchNum"])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:none'
self.insertConsumerInfo(consumerId, expectrowcnt/4,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("start to check consume result")
expectRows = 1
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != 0:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
tdLog.exit("tmq consume rows error!")
self.initConsumerInfoTable()
consumerId = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:none'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("again start consume processor")
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("again check consume result")
expectRows = 2
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != 0:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 11 end ...... ")
def tmqCase12(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 12: ")
self.initConsumerTable()
# create and start thread
parameterDict = {'cfg': '', \
'actionType': 0, \
'dbName': 'db12', \
'dropFlag': 1, \
'vgroups': 4, \
'replica': 1, \
'stbName': 'stb1', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
self.insert_data(tdSql,\
parameterDict["dbName"],\
parameterDict["stbName"],\
parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],\
parameterDict["batchNum"])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt/4,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("start to check consume result")
expectRows = 1
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt/4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
tdLog.exit("tmq consume rows error!")
self.initConsumerInfoTable()
consumerId = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:none'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("again start consume processor")
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("again check consume result")
expectRows = 2
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt/4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 12 end ...... ")
def tmqCase13(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 13: ")
self.initConsumerTable()
# create and start thread
parameterDict = {'cfg': '', \
'actionType': 0, \
'dbName': 'db13', \
'dropFlag': 1, \
'vgroups': 4, \
'replica': 1, \
'stbName': 'stb1', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
self.insert_data(tdSql,\
parameterDict["dbName"],\
parameterDict["stbName"],\
parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],\
parameterDict["batchNum"])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt/4,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("start to check consume result")
expectRows = 1
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt/4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
tdLog.exit("tmq consume rows error!")
self.initConsumerInfoTable()
consumerId = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:none'
self.insertConsumerInfo(consumerId, expectrowcnt/2,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("again start consume processor")
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("again check consume result")
expectRows = 2
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt*(1/2+1/4):
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*(1/2+1/4)))
tdLog.exit("tmq consume rows error!")
self.initConsumerInfoTable()
consumerId = 2
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:none'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("again start consume processor")
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("again check consume result")
expectRows = 3
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 13 end ...... ")
def run(self):
tdSql.prepare()
......@@ -1099,8 +1384,10 @@ class TDTestCase:
self.tmqCase7(cfgPath, buildPath)
self.tmqCase8(cfgPath, buildPath)
self.tmqCase9(cfgPath, buildPath)
self.tmqCase10(cfgPath, buildPath)
self.tmqCase10(cfgPath, buildPath)
self.tmqCase11(cfgPath, buildPath)
self.tmqCase12(cfgPath, buildPath)
self.tmqCase13(cfgPath, buildPath)
def stop(self):
tdSql.close()
......
......@@ -315,6 +315,7 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
memcpy(buf, val, length);
buf[length] = 0;
taosFprintfFile(pFile, "\'%s\'", buf);
......@@ -384,19 +385,25 @@ void shellPrintNChar(const char *str, int32_t length, int32_t width) {
while (pos < length) {
TdWchar wc;
int32_t bytes = taosMbToWchar(&wc, str + pos, MB_CUR_MAX);
if (bytes == 0) {
if (bytes <= 0) {
break;
}
pos += bytes;
if (pos > length) {
if (pos + bytes > length) {
break;
}
int w = 0;
#ifdef WINDOWS
int32_t w = bytes;
w = bytes;
#else
int32_t w = taosWcharWidth(wc);
if(*(str + pos) == '\t' || *(str + pos) == '\n' || *(str + pos) == '\r'){
w = bytes;
}else{
w = taosWcharWidth(wc);
}
#endif
pos += bytes;
if (w <= 0) {
continue;
}
......@@ -496,6 +503,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
shellPrintNChar(val, length, width);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
......@@ -604,7 +612,6 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) {
case TSDB_DATA_TYPE_DOUBLE:
return TMAX(25, width);
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_BINARY:
if (field->bytes > shell.args.displayWidth) {
return TMAX(shell.args.displayWidth, width);
......@@ -612,7 +619,8 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) {
return TMAX(field->bytes, width);
}
case TSDB_DATA_TYPE_NCHAR: {
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON: {
int16_t bytes = field->bytes * TSDB_NCHAR_SIZE;
if (bytes > shell.args.displayWidth) {
return TMAX(shell.args.displayWidth, width);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册