提交 1a4882a6 编写于 作者: C Cary Xu

Merge branch 'develop' into hotfix/TS-854-D

......@@ -3,7 +3,7 @@
# Generate the deb package for ubuntu, or rpm package for centos, or tar.gz package for other linux os
set -e
#set -x
set -x
# release.sh -v [cluster | edge]
# -c [aarch32 | aarch64 | x64 | x86 | mips64 ...]
......@@ -414,10 +414,14 @@ fi
if [[ "$httpdBuild" == "true" ]]; then
BUILD_HTTP=true
BUILD_TOOLS=false
else
BUILD_HTTP=false
fi
if [[ "$pagMode" == "full" ]]; then
BUILD_TOOLS=true
else
BUILD_TOOLS=false
fi
# check support cpu type
......@@ -514,15 +518,17 @@ if [ "$osType" != "Darwin" ]; then
cd ${script_dir}/deb
${csudo} ./makedeb.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
if [ -d ${top_dir}/src/kit/taos-tools/packaging/deb ]; then
cd ${top_dir}/src/kit/taos-tools/packaging/deb
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
if [[ "$pagMode" == "full" ]]; then
if [ -d ${top_dir}/src/kit/taos-tools/packaging/deb ]; then
cd ${top_dir}/src/kit/taos-tools/packaging/deb
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taos_tools_ver=$(git describe --tags|sed -e 's/ver-//g'|awk -F '-' '{print $1}')
${csudo} ./make-taos-tools-deb.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
taos_tools_ver=$(git describe --tags|sed -e 's/ver-//g'|awk -F '-' '{print $1}')
${csudo} ./make-taos-tools-deb.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
fi
fi
else
else
echo "==========dpkg command not exist, so not release deb package!!!"
fi
ret='0'
......@@ -537,15 +543,17 @@ if [ "$osType" != "Darwin" ]; then
cd ${script_dir}/rpm
${csudo} ./makerpm.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
if [ -d ${top_dir}/src/kit/taos-tools/packaging/rpm ]; then
cd ${top_dir}/src/kit/taos-tools/packaging/rpm
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
if [[ "$pagMode" == "full" ]]; then
if [ -d ${top_dir}/src/kit/taos-tools/packaging/rpm ]; then
cd ${top_dir}/src/kit/taos-tools/packaging/rpm
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taos_tools_ver=$(git describe --tags|sed -e 's/ver-//g'|awk -F '-' '{print $1}'|sed -e 's/-/_/g')
${csudo} ./make-taos-tools-rpm.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
taos_tools_ver=$(git describe --tags|sed -e 's/ver-//g'|awk -F '-' '{print $1}'|sed -e 's/-/_/g')
${csudo} ./make-taos-tools-rpm.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
fi
fi
else
else
echo "==========rpmbuild command not exist, so not release rpm package!!!"
fi
fi
......
......@@ -47,7 +47,8 @@ if [ "$osType" != "Darwin" ]; then
else
bin_files="${script_dir}/remove_client.sh \
${script_dir}/set_core.sh \
${script_dir}/get_client.sh ${script_dir}/taosd-dump-cfg.gdb"
${script_dir}/get_client.sh"
#${script_dir}/get_client.sh ${script_dir}/taosd-dump-cfg.gdb"
fi
lib_files="${build_dir}/lib/libtaos.so.${version}"
else
......
......@@ -77,7 +77,7 @@ if [ "$osType" != "Darwin" ]; then
cp ${build_dir}/bin/taosdump ${install_dir}/bin/jh_taosdump
cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
#cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
fi
else
cp ${bin_files} ${install_dir}/bin
......
......@@ -77,7 +77,7 @@ if [ "$osType" != "Darwin" ]; then
cp ${build_dir}/bin/taosdump ${install_dir}/bin/khdump
cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
#cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
fi
else
cp ${bin_files} ${install_dir}/bin
......
......@@ -117,7 +117,7 @@ if [ "$osType" != "Darwin" ]; then
cp ${build_dir}/bin/taosdump ${install_dir}/bin/powerdump
cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
#cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
fi
else
cp ${bin_files} ${install_dir}/bin
......
......@@ -77,7 +77,7 @@ if [ "$osType" != "Darwin" ]; then
cp ${build_dir}/bin/taosdump ${install_dir}/bin/prodump
cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
#cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
fi
else
cp ${bin_files} ${install_dir}/bin
......
......@@ -77,7 +77,7 @@ if [ "$osType" != "Darwin" ]; then
cp ${build_dir}/bin/taosdump ${install_dir}/bin/tqdump
cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
#cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
fi
else
cp ${bin_files} ${install_dir}/bin
......
......@@ -256,7 +256,7 @@ void tscColumnListDestroy(SArray* pColList);
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
void tscColumnListCopyAll(SArray* dst, const SArray* src);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar, bool convertJson);
void tscDequoteAndTrimToken(SStrToken* pToken);
void tscRmEscapeAndTrimToken(SStrToken* pToken);
......@@ -264,7 +264,7 @@ int32_t tscValidateName(SStrToken* pToken, bool escapeEnabled, bool *dbIncluded)
void tscIncStreamExecutionCount(void* pStream);
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t numOfParams);
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId);
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
SCond* tsGetSTableQueryCond(STagCond* pCond, uint64_t uid);
......@@ -394,6 +394,9 @@ void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id);
char* cloneCurrentDBName(SSqlObj* pSql);
int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, int16_t startColId);
int8_t jsonType2DbType(double data, int jsonType);
void getJsonKey(SStrToken *t0);
char* cloneCurrentDBName(SSqlObj* pSql);
......
......@@ -453,7 +453,7 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar, bool convertJson);
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent);
void destroyTableNameList(SInsertStatementParam* pInsertParam);
......
......@@ -547,6 +547,11 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
jniFromNCharToByteArray(env, (char *)row[i], length[i]));
break;
}
case TSDB_DATA_TYPE_JSON: {
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteArrayFp, i,
jniFromNCharToByteArray(env, (char *)row[i], length[i]));
break;
}
case TSDB_DATA_TYPE_TIMESTAMP: {
int precision = taos_result_precision(result);
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetTimestampFp, i, (jlong) * ((int64_t *)row[i]), precision);
......
......@@ -85,16 +85,15 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1);
dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i;
STR_WITH_MAXSIZE_TO_VARSTR(dst, type, pField->bytes);
int32_t bytes = pSchema[i].bytes;
if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
if (pSchema[i].type == TSDB_DATA_TYPE_BINARY){
bytes -= VARSTR_HEADER_SIZE;
if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes / TSDB_NCHAR_SIZE;
}
}
else if(pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
bytes -= VARSTR_HEADER_SIZE;
bytes = bytes / TSDB_NCHAR_SIZE;
}
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 2);
......@@ -222,7 +221,7 @@ static int32_t tscGetNthFieldResult(TAOS_ROW row, TAOS_FIELD* fields, int *lengt
return -1;
}
uint8_t type = fields[idx].type;
int32_t length = lengths[idx];
int32_t length = lengths[idx];
switch (type) {
case TSDB_DATA_TYPE_BOOL:
......@@ -248,6 +247,7 @@ static int32_t tscGetNthFieldResult(TAOS_ROW row, TAOS_FIELD* fields, int *lengt
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_JSON:
memcpy(result, val, length);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
......
......@@ -355,6 +355,19 @@ int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, cha
}
break;
case TSDB_DATA_TYPE_JSON:
if (pToken->n >= pSchema->bytes) { // reserve 1 byte for select
return tscInvalidOperationMsg(msg, "json tag length too long", pToken->z);
}
if (pToken->type == TK_NULL) {
*(int8_t *)payload = TSDB_DATA_TINYINT_NULL;
} else if (pToken->type != TK_STRING){
tscInvalidOperationMsg(msg, "invalid json data", pToken->z);
} else{
*((int8_t *)payload) = TSDB_DATA_JSON_PLACEHOLDER;
}
break;
case TSDB_DATA_TYPE_TIMESTAMP: {
if (pToken->type == TK_NULL) {
if (primaryKey) {
......@@ -1058,9 +1071,26 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
return code;
}
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
}
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal, false);
if(pSchema->type == TSDB_DATA_TYPE_JSON){
assert(spd.numOfBound == 1);
if(sToken.n > TSDB_MAX_JSON_TAGS_LEN/TSDB_NCHAR_SIZE){
tdDestroyKVRowBuilder(&kvRowBuilder);
tscDestroyBoundColumnInfo(&spd);
return tscSQLSyntaxErrMsg(pInsertParam->msg, "json tag too long", NULL);
}
char* json = strndup(sToken.z, sToken.n);
code = parseJsontoTagData(json, &kvRowBuilder, pInsertParam->msg, pTagSchema[spd.boundedColumns[0]].colId);
if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
tscDestroyBoundColumnInfo(&spd);
tfree(json);
return code;
}
tfree(json);
}
}
tscDestroyBoundColumnInfo(&spd);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
......@@ -1074,7 +1104,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
if (pInsertParam->tagData.dataLen <= 0){
return tscSQLSyntaxErrMsg(pInsertParam->msg, "tag value expected", NULL);
}
char* pTag = realloc(pInsertParam->tagData.data, pInsertParam->tagData.dataLen);
if (pTag == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......
此差异已折叠。
......@@ -832,7 +832,7 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
}
if (validateColumn && !tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
if (validateColumn && !tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
tscError("0x%"PRIx64" table schema is not matched with parsed sql", id);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......@@ -1510,7 +1510,8 @@ int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
SCreateTableSql *pCreateTableInfo = pInfo->pCreateTableInfo;
if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
int32_t numOfTables = (int32_t)taosArrayGetSize(pInfo->pCreateTableInfo->childTableInfo);
size += numOfTables * (sizeof(SCreateTableMsg) + TSDB_MAX_TAGS_LEN);
size += numOfTables * (sizeof(SCreateTableMsg) +
((TSDB_MAX_TAGS_LEN > TSDB_MAX_JSON_TAGS_LEN)?TSDB_MAX_TAGS_LEN:TSDB_MAX_JSON_TAGS_LEN));
} else {
size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
}
......@@ -1632,9 +1633,6 @@ int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
}
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char *pMsg;
int msgLen = 0;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
......@@ -1663,14 +1661,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSchema++;
}
pMsg = (char *)pSchema;
pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
if (pAlterInfo->tagData.dataLen > 0) {
memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
}
pMsg += pAlterInfo->tagData.dataLen;
msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
int msgLen = sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo);
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
......@@ -1872,7 +1863,9 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
uint64_t localQueryId = pSql->self;
qTableQuery(pQueryInfo->pQInfo, &localQueryId);
convertQueryResult(pRes, pQueryInfo, pSql->self, true);
bool convertJson = true;
if (pQueryInfo->isStddev == true) convertJson = false;
convertQueryResult(pRes, pQueryInfo, pSql->self, true, convertJson);
code = pRes->code;
if (pRes->code == TSDB_CODE_SUCCESS) {
......
......@@ -445,7 +445,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
// revise the length for binary and nchar fields
if (f[j].type == TSDB_DATA_TYPE_BINARY) {
f[j].bytes -= VARSTR_HEADER_SIZE;
} else if (f[j].type == TSDB_DATA_TYPE_NCHAR) {
} else if (f[j].type == TSDB_DATA_TYPE_NCHAR || f[j].type == TSDB_DATA_TYPE_JSON) {
f[j].bytes = (f[j].bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
}
......
......@@ -227,6 +227,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
if (skipped) {
slot = 0;
stackidx = 0;
tVariantDestroy(&tag);
continue;
}
......@@ -334,6 +335,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
}
if (mergeDone) {
tVariantDestroy(&tag);
break;
}
......@@ -341,6 +343,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
stackidx = 0;
skipRemainValue(mainCtx->p->pTSBuf, &tag);
tVariantDestroy(&tag);
}
stackidx = 0;
......@@ -633,16 +636,21 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
// set the join condition tag column info, todo extract method
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pQueryInfo->tagCond.joinInfo.hasJoin);
pExpr->base.numOfParams = 0; // the value is 0 by default. just make sure.
// add json tag key, if there is no json tag key, just hold place.
tVariantCreateFromBinary(&(pExpr->base.param[pExpr->base.numOfParams]), pQueryInfo->tagCond.joinInfo.joinTables[0]->tagJsonKeyName,
strlen(pQueryInfo->tagCond.joinInfo.joinTables[0]->tagJsonKeyName), TSDB_DATA_TYPE_BINARY);
pExpr->base.numOfParams++;
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
// set the tag column id for executor to extract correct tag value
tVariant* pVariant = &pExpr->base.param[0];
tVariant* pVariant = &pExpr->base.param[pExpr->base.numOfParams];
pVariant->i64 = colId;
pVariant->nType = TSDB_DATA_TYPE_BIGINT;
pVariant->nLen = sizeof(int64_t);
pExpr->base.numOfParams = 1;
pExpr->base.numOfParams++;
}
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
......@@ -729,8 +737,15 @@ int32_t tagValCompar(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) varDataVal(p1);
const STidTags* t2 = (const STidTags*) varDataVal(p2);
__compar_fn_t func = getComparFunc(t1->padding, 0);
if (t1->padding == TSDB_DATA_TYPE_JSON){
bool canReturn = true;
int32_t result = jsonCompareUnit(t1->tag, t2->tag, &canReturn);
if(canReturn) return result;
__compar_fn_t func = getComparFunc(t1->tag[0], 0);
return func(t1->tag + CHAR_BYTES, t2->tag + CHAR_BYTES);
}
__compar_fn_t func = getComparFunc(t1->padding, 0);
return func(t1->tag, t2->tag);
}
......@@ -821,16 +836,21 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL, getNewResColId(pCmd));
SExprInfo *pExpr = tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL, getNewResColId(pCmd));
// set the tags value for ts_comp function
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
SExprInfo *pExpr = tscExprGet(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->base.param[0].i64 = tagColId;
pExpr->base.param[0].nLen = sizeof(int64_t);
pExpr->base.param[0].nType = TSDB_DATA_TYPE_BIGINT;
pExpr->base.numOfParams = 1;
pExpr->base.numOfParams = 0; // the value is 0 by default. just make sure.
// add json tag key, if there is no json tag key, just hold place.
tVariantCreateFromBinary(&(pExpr->base.param[pExpr->base.numOfParams]), pSupporter->tagCond.joinInfo.joinTables[0]->tagJsonKeyName,
strlen(pSupporter->tagCond.joinInfo.joinTables[0]->tagJsonKeyName), TSDB_DATA_TYPE_BINARY);
pExpr->base.numOfParams++;
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->base.param[pExpr->base.numOfParams].i64 = tagColId;
pExpr->base.param[pExpr->base.numOfParams].nLen = sizeof(int64_t);
pExpr->base.param[pExpr->base.numOfParams].nType = TSDB_DATA_TYPE_BIGINT;
pExpr->base.numOfParams++;
}
// add the filter tag column
......@@ -2012,7 +2032,12 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
// set get tags query type
TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG, getNewResColId(pCmd));
SExprInfo* pExpr = tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG, getNewResColId(pCmd));
if(strlen(pTagCond->joinInfo.joinTables[0]->tagJsonKeyName) > 0){
tVariantCreateFromBinary(&(pExpr->base.param[pExpr->base.numOfParams]), pTagCond->joinInfo.joinTables[0]->tagJsonKeyName,
strlen(pTagCond->joinInfo.joinTables[0]->tagJsonKeyName), TSDB_DATA_TYPE_BINARY);
pExpr->base.numOfParams++;
}
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscDebug(
......@@ -2025,15 +2050,6 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL, getNewResColId(pCmd));
// set the tags value for ts_comp function
SExprInfo *pExpr = tscExprGet(pNewQueryInfo, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->base.param->i64 = tagColId;
pExpr->base.numOfParams = 1;
}
// add the filter tag column
if (pSupporter->colList != NULL) {
size_t s = taosArrayGetSize(pSupporter->colList);
......@@ -2427,6 +2443,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
tscInitQueryInfo(pNewQueryInfo);
pNewQueryInfo->isStddev = true; // for json tag
// add the group cond
pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
......@@ -2490,6 +2507,10 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
}
SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG, getNewResColId(pCmd));
if (schema->type == TSDB_DATA_TYPE_JSON){
p->base.numOfParams = pExpr->base.numOfParams;
tVariantAssign(&p->base.param[0], &pExpr->base.param[0]);
}
p->base.resColId = pExpr->base.resColId;
} else if (pExpr->base.functionId == TSDB_FUNC_PRJ) {
int32_t num = (int32_t) taosArrayGetSize(pNewQueryInfo->groupbyExpr.columnInfo);
......@@ -3681,7 +3702,9 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql) {
int32_t type = pInfo->field.type;
int32_t bytes = pInfo->field.bytes;
if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
if (pQueryInfo->isStddev && type == TSDB_DATA_TYPE_JSON){ // for json tag compare in the second round of stddev
pRes->tsrow[j] = pRes->urow[i];
}else if (!IS_VAR_DATA_TYPE(type) && type != TSDB_DATA_TYPE_JSON) {
pRes->tsrow[j] = isNull(pRes->urow[i], type) ? NULL : pRes->urow[i];
} else {
pRes->tsrow[j] = isNull(pRes->urow[i], type) ? NULL : varDataVal(pRes->urow[i]);
......
......@@ -29,6 +29,7 @@
#include "tsclient.h"
#include "ttimer.h"
#include "ttokendef.h"
#include "cJSON.h"
#ifdef HTTP_EMBEDDED
#include "httpInt.h"
......@@ -109,6 +110,7 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le
n = bufSize + escapeSize;
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
if (bufSize < 0) {
tscError("invalid buf size");
return TSDB_CODE_TSC_INVALID_VALUE;
......@@ -732,34 +734,33 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS;
}
static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bool convertNchar) {
static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bool convertNchar, bool convertJson) {
// generated the user-defined column result
if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) {
if (pInfo->pExpr->base.param[1].nType == TSDB_DATA_TYPE_NULL) {
if (pInfo->pExpr->base.param[0].nType == TSDB_DATA_TYPE_NULL) {
setNullN(pRes->urow[i], pInfo->field.type, pInfo->field.bytes, (int32_t) pRes->numOfRows);
} else {
if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR || pInfo->field.type == TSDB_DATA_TYPE_BINARY) {
assert(pInfo->pExpr->base.param[1].nLen <= pInfo->field.bytes);
assert(pInfo->pExpr->base.param[0].nLen <= pInfo->field.bytes);
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
memcpy(varDataVal(p), pInfo->pExpr->base.param[1].pz, pInfo->pExpr->base.param[1].nLen);
varDataSetLen(p, pInfo->pExpr->base.param[1].nLen);
memcpy(varDataVal(p), pInfo->pExpr->base.param[0].pz, pInfo->pExpr->base.param[0].nLen);
varDataSetLen(p, pInfo->pExpr->base.param[0].nLen);
}
} else {
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
memcpy(p, &pInfo->pExpr->base.param[1].i64, pInfo->field.bytes);
memcpy(p, &pInfo->pExpr->base.param[0].i64, pInfo->field.bytes);
}
}
}
} else if (convertNchar && pInfo->field.type == TSDB_DATA_TYPE_NCHAR) {
} else if (convertNchar && (pInfo->field.type == TSDB_DATA_TYPE_NCHAR)) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
char* buffer = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
if(buffer == NULL)
return ;
if (buffer == NULL) return;
pRes->buffer[i] = buffer;
// string terminated char for binary data
memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows);
......@@ -783,8 +784,63 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo
p += pInfo->field.bytes;
}
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
}else if (pInfo->field.type == TSDB_DATA_TYPE_JSON) {
if (convertJson){
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
char* buffer = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
if (buffer == NULL) return;
pRes->buffer[i] = buffer;
// string terminated char for binary data
memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows);
char* p = pRes->urow[i];
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* dst = pRes->buffer[i] + k * pInfo->field.bytes;
char type = *p;
char* realData = p + CHAR_BYTES;
if (type == TSDB_DATA_TYPE_JSON && isNull(realData, TSDB_DATA_TYPE_JSON)) {
memcpy(dst, realData, varDataTLen(realData));
} else if (type == TSDB_DATA_TYPE_BINARY) {
assert(*(uint32_t*)varDataVal(realData) == TSDB_DATA_JSON_null); // json null value
assert(varDataLen(realData) == INT_BYTES);
sprintf(varDataVal(dst), "%s", "null");
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if (type == TSDB_DATA_TYPE_JSON) {
int32_t length = taosUcs4ToMbs(varDataVal(realData), varDataLen(realData), varDataVal(dst));
varDataSetLen(dst, length);
if (length == 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
}
}else if (type == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
*(char*)varDataVal(dst) = '\"';
int32_t length = taosUcs4ToMbs(varDataVal(realData), varDataLen(realData), POINTER_SHIFT(varDataVal(dst), CHAR_BYTES));
*(char*)(POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES)) = '\"';
varDataSetLen(dst, length + CHAR_BYTES*2);
if (length == 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
}
}else if (type == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(realData);
sprintf(varDataVal(dst), "%.9lf", jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if (type == TSDB_DATA_TYPE_BIGINT) {
int64_t jsonVd = *(int64_t*)(realData);
sprintf(varDataVal(dst), "%" PRId64, jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if (type == TSDB_DATA_TYPE_BOOL) {
sprintf(varDataVal(dst), "%s", (*((char *)realData) == 1) ? "true" : "false");
varDataSetLen(dst, strlen(varDataVal(dst)));
}else {
assert(0);
}
p += pInfo->field.bytes;
}
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
}else{
// if convertJson is false, json data as raw data used for stddev for the second round
}
}
if (convertNchar) {
......@@ -792,6 +848,10 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo
}
}
void tscJson2String(char *src, char* dst){
}
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted) {
assert(pRes->numOfCols > 0);
if (pRes->numOfRows == 0) {
......@@ -805,11 +865,11 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted) {
pRes->length[i] = pInfo->field.bytes;
offset += pInfo->field.bytes;
setResRawPtrImpl(pRes, pInfo, i, converted ? false : true);
setResRawPtrImpl(pRes, pInfo, i, converted ? false : true, true);
}
}
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar) {
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar, bool convertJson) {
assert(pRes->numOfCols > 0);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
......@@ -820,7 +880,7 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc
pRes->urow[i] = pColData->pData;
pRes->length[i] = pInfo->field.bytes;
setResRawPtrImpl(pRes, pInfo, i, convertNchar);
setResRawPtrImpl(pRes, pInfo, i, convertNchar, convertJson);
/*
// generated the user-defined column result
if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.ColName.flag)) {
......@@ -876,17 +936,6 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc
}
}
static SColumnInfo* extractColumnInfoFromResult(SArray* pTableCols) {
int32_t numOfCols = (int32_t) taosArrayGetSize(pTableCols);
SColumnInfo* pColInfo = calloc(numOfCols, sizeof(SColumnInfo));
for(int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCol = taosArrayGetP(pTableCols, i);
pColInfo[i] = pCol->info;//[index].type;
}
return pColInfo;
}
typedef struct SDummyInputInfo {
SSDataBlock *block;
STableQueryInfo *pTableQueryInfo;
......@@ -1276,14 +1325,14 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUp
return pOperator;
}
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar) {
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar, bool convertJson) {
// set the correct result
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
pRes->numOfRows = (p != NULL)? p->info.rows: 0;
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
tscCreateResPointerInfo(pRes, pQueryInfo);
tscSetResRawPtrRv(pRes, pQueryInfo, p, convertNchar);
tscSetResRawPtrRv(pRes, pQueryInfo, p, convertNchar, convertJson);
}
tscDebug("0x%"PRIx64" retrieve result in pRes, numOfRows:%d", objId, pRes->numOfRows);
......@@ -1317,8 +1366,6 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
// handle the following query process
if (px->pQInfo == NULL) {
SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList);
STableMeta* pTableMeta = tscGetMetaInfo(px, 0)->pTableMeta;
SSchema* pSchema = tscGetTableSchema(pTableMeta);
......@@ -1433,7 +1480,6 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
px->pQInfo->runtimeEnv.udfIsCopy = true;
px->pQInfo->runtimeEnv.pUdfInfo = pUdfInfo;
tfree(pColumnInfo);
tfree(schema);
// set the pRuntimeEnv for pSourceOperator
......@@ -1442,7 +1488,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
uint64_t qId = pSql->self;
qTableQuery(px->pQInfo, &qId);
convertQueryResult(pOutput, px, pSql->self, false);
convertQueryResult(pOutput, px, pSql->self, false, false);
}
static void tscDestroyResPointerInfo(SSqlRes* pRes) {
......@@ -3086,12 +3132,12 @@ void tscIncStreamExecutionCount(void* pStream) {
ps->num += 1;
}
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t numOfParams) {
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) {
if (pTableMetaInfo->pTableMeta == NULL) {
return false;
}
if (colId == TSDB_TBNAME_COLUMN_INDEX || (colId <= TSDB_UD_COLUMN_INDEX && numOfParams == 2)) {
if (colId == TSDB_TBNAME_COLUMN_INDEX || colId <= TSDB_UD_COLUMN_INDEX) {
return true;
}
......@@ -5384,3 +5430,152 @@ char* cloneCurrentDBName(SSqlObj* pSql) {
return p;
}
int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, int16_t startColId){
// set json NULL data
uint8_t nullTypeVal[CHAR_BYTES + VARSTR_HEADER_SIZE + INT_BYTES] = {0};
uint32_t jsonNULL = TSDB_DATA_JSON_NULL;
int jsonIndex = startColId + 1;
char nullTypeKey[VARSTR_HEADER_SIZE + INT_BYTES] = {0};
varDataSetLen(nullTypeKey, INT_BYTES);
nullTypeVal[0] = TSDB_DATA_TYPE_JSON;
varDataSetLen(nullTypeVal + CHAR_BYTES, INT_BYTES);
*(uint32_t*)(varDataVal(nullTypeKey)) = jsonNULL;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, nullTypeKey, false); // add json null type
if (strtrim(json) == 0 || strcasecmp(json, "null") == 0){
*(uint32_t*)(varDataVal(nullTypeVal + CHAR_BYTES)) = jsonNULL;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, nullTypeVal, true); // add json null value
return TSDB_CODE_SUCCESS;
}
int32_t jsonNotNull = TSDB_DATA_JSON_NOT_NULL;
*(uint32_t*)(varDataVal(nullTypeVal + CHAR_BYTES)) = jsonNotNull;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, nullTypeVal, true); // add json type
// set json real data
cJSON *root = cJSON_Parse(json);
if (root == NULL){
tscError("json parse error");
return tscSQLSyntaxErrMsg(errMsg, "json parse error", NULL);
}
int size = cJSON_GetArraySize(root);
if(!cJSON_IsObject(root)){
tscError("json error invalide value");
return tscSQLSyntaxErrMsg(errMsg, "json error invalide value", NULL);
}
int retCode = 0;
SHashObj* keyHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
for(int i = 0; i < size; i++) {
cJSON* item = cJSON_GetArrayItem(root, i);
if (!item) {
tscError("json inner error:%d", i);
retCode = tscSQLSyntaxErrMsg(errMsg, "json inner error", NULL);
goto end;
}
char *jsonKey = item->string;
if(!isValidateTag(jsonKey)){
tscError("json key not validate");
retCode = tscSQLSyntaxErrMsg(errMsg, "json key not validate", NULL);
goto end;
}
if(strlen(jsonKey) > TSDB_MAX_JSON_KEY_LEN){
tscError("json key too long error");
retCode = tscSQLSyntaxErrMsg(errMsg, "json key too long, more than 256", NULL);
goto end;
}
if(strlen(jsonKey) == 0 || taosHashGet(keyHash, jsonKey, strlen(jsonKey)) != NULL){
continue;
}
// json key encode by binary
char tagKey[TSDB_MAX_JSON_KEY_LEN + VARSTR_HEADER_SIZE] = {0};
strncpy(varDataVal(tagKey), jsonKey, strlen(jsonKey));
int32_t outLen = (int32_t)strlen(jsonKey);
taosHashPut(keyHash, jsonKey, outLen, &outLen, CHAR_BYTES); // add key to hash to remove dumplicate, value is useless
varDataSetLen(tagKey, outLen);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagKey, false); // add json key
if(item->type == cJSON_String){ // add json value format: type|data
char *jsonValue = item->valuestring;
outLen = 0;
char tagVal[TSDB_MAX_JSON_TAGS_LEN] = {0};
*tagVal = jsonType2DbType(0, item->type); // type
char* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES);
if (!taosMbsToUcs4(jsonValue, strlen(jsonValue), varDataVal(tagData),
TSDB_MAX_JSON_TAGS_LEN - CHAR_BYTES - VARSTR_HEADER_SIZE, &outLen)) {
tscError("json string error:%s|%s", strerror(errno), jsonValue);
retCode = tscSQLSyntaxErrMsg(errMsg, "serizelize json error", NULL);
goto end;
}
varDataSetLen(tagData, outLen);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal, true);
}else if(item->type == cJSON_Number){
char tagVal[LONG_BYTES + CHAR_BYTES] = {0};
*tagVal = jsonType2DbType(item->valuedouble, item->type); // type
char* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES);
if(*tagVal == TSDB_DATA_TYPE_DOUBLE) *((double *)tagData) = item->valuedouble;
else if(*tagVal == TSDB_DATA_TYPE_BIGINT) *((int64_t *)tagData) = item->valueint;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_BIGINT, tagVal, true);
}else if(item->type == cJSON_True || item->type == cJSON_False){
char tagVal[CHAR_BYTES + CHAR_BYTES] = {0};
*tagVal = jsonType2DbType((double)(item->valueint), item->type); // type
char* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES);
*tagData = (char)(item->valueint);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_BOOL, tagVal, true);
}else if(item->type == cJSON_NULL){
char tagVal[CHAR_BYTES + VARSTR_HEADER_SIZE + INT_BYTES] = {0};
*tagVal = jsonType2DbType(0, item->type); // type
int32_t* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES);
varDataSetLen(tagData, INT_BYTES);
*(uint32_t*)(varDataVal(tagData)) = TSDB_DATA_JSON_null;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_BINARY, tagVal, true);
}
else{
retCode = tscSQLSyntaxErrMsg(errMsg, "invalidate json value", NULL);
goto end;
}
}
if(taosHashGetSize(keyHash) == 0){ // set json NULL true
*(uint32_t*)(varDataVal(nullTypeVal + CHAR_BYTES)) = jsonNULL;
memcpy(POINTER_SHIFT(kvRowBuilder->buf, kvRowBuilder->pColIdx[2].offset), nullTypeVal, CHAR_BYTES + VARSTR_HEADER_SIZE + INT_BYTES);
}
end:
taosHashCleanup(keyHash);
cJSON_Delete(root);
return retCode;
}
int8_t jsonType2DbType(double data, int jsonType){
switch(jsonType){
case cJSON_Number:
if (data - (int64_t)data > 0) return TSDB_DATA_TYPE_DOUBLE; else return TSDB_DATA_TYPE_BIGINT;
case cJSON_String:
return TSDB_DATA_TYPE_NCHAR;
case cJSON_NULL:
return TSDB_DATA_TYPE_BINARY;
case cJSON_True:
case cJSON_False:
return TSDB_DATA_TYPE_BOOL;
}
return TSDB_DATA_TYPE_NULL;
}
// get key from json->'key'
void getJsonKey(SStrToken *t0){
while(true){
t0->n = tGetToken(t0->z, &t0->type);
if (t0->type == TK_STRING){
t0->z++;
t0->n -= 2;
break;
}else if (t0->type == TK_ILLEGAL){
assert(0);
}
t0->z += t0->n;
}
}
......@@ -547,7 +547,7 @@ void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) {
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value, bool isJumpJsonVType) {
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
SColIdx* pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
......@@ -560,9 +560,14 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
pBuilder->nCols++;
int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
char* jumpType = (char*)value;
if(isJumpJsonVType) jumpType += CHAR_BYTES;
int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(jumpType) : TYPE_BYTES[type];
if(isJumpJsonVType) tlen += CHAR_BYTES; // add type size
if (tlen > pBuilder->alloc - pBuilder->size) {
while (tlen > pBuilder->alloc - pBuilder->size) {
assert(pBuilder->alloc > 0);
pBuilder->alloc *= 2;
}
void* buf = realloc(pBuilder->buf, pBuilder->alloc);
......
......@@ -25,7 +25,7 @@ extern "C" {
// variant, each number/string/field_id has a corresponding struct during parsing sql
typedef struct tVariant {
uint32_t nType;
int32_t nType; // change uint to int, because in tVariantCreate() pVar->nType = -1; // -1 means error type
int32_t nLen; // only used for string, for number, it is useless
union {
int64_t i64;
......
......@@ -117,11 +117,12 @@ void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)) {
doExprTreeDestroy(&pNode, fp);
} else if (pNode->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy(pNode->pVal);
tfree(pNode->pVal);
} else if (pNode->nodeType == TSQL_NODE_COL) {
tfree(pNode->pSchema);
}
free(pNode);
tfree(pNode);
}
static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
......@@ -138,12 +139,12 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
}
} else if ((*pExpr)->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy((*pExpr)->pVal);
free((*pExpr)->pVal);
tfree((*pExpr)->pVal);
} else if ((*pExpr)->nodeType == TSQL_NODE_COL) {
free((*pExpr)->pSchema);
tfree((*pExpr)->pSchema);
}
free(*pExpr);
tfree(*pExpr);
*pExpr = NULL;
}
......@@ -288,7 +289,7 @@ static void exprTreeToBinaryImpl(SBufferWriter* bw, tExprNode* expr) {
tVariant* pVal = expr->pVal;
tbufWriteUint32(bw, pVal->nType);
if (pVal->nType == TSDB_DATA_TYPE_BINARY) {
if (pVal->nType == TSDB_DATA_TYPE_BINARY || pVal->nType == TSDB_DATA_TYPE_NCHAR) {
tbufWriteInt32(bw, pVal->nLen);
tbufWrite(bw, pVal->pz, pVal->nLen);
} else {
......@@ -349,7 +350,7 @@ static tExprNode* exprTreeFromBinaryImpl(SBufferReader* br) {
pExpr->pVal = pVal;
pVal->nType = tbufReadUint32(br);
if (pVal->nType == TSDB_DATA_TYPE_BINARY) {
if (pVal->nType == TSDB_DATA_TYPE_BINARY || pVal->nType == TSDB_DATA_TYPE_NCHAR) {
tbufReadToBuffer(br, &pVal->nLen, sizeof(pVal->nLen));
pVal->pz = calloc(1, pVal->nLen + 1);
tbufReadToBuffer(br, pVal->pz, pVal->nLen);
......
......@@ -4,6 +4,7 @@
#include "tname.h"
#include "ttoken.h"
#include "tvariant.h"
#include "tglobal.h"
#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS)
#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS)
......@@ -251,6 +252,9 @@ static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen
int32_t rowLen = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
if (pSchema[i].type == TSDB_DATA_TYPE_JSON && numOfCols != 1){
return false;
}
// 1. valid types
if (!isValidDataType(pSchema[i].type)) {
return false;
......@@ -301,8 +305,12 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
if (!doValidateSchema(pSchema, numOfCols, TSDB_MAX_BYTES_PER_ROW)) {
return false;
}
int32_t maxTagLen = TSDB_MAX_TAGS_LEN;
if (numOfTags == 1 && pSchema[numOfCols].type == TSDB_DATA_TYPE_JSON){
maxTagLen = TSDB_MAX_JSON_TAGS_LEN;
}
if (!doValidateSchema(&pSchema[numOfCols], numOfTags, TSDB_MAX_TAGS_LEN)) {
if (!doValidateSchema(&pSchema[numOfCols], numOfTags, maxTagLen)) {
return false;
}
......
......@@ -18,7 +18,7 @@
#include "ttokendef.h"
#include "tscompression.h"
const int32_t TYPE_BYTES[15] = {
const int32_t TYPE_BYTES[16] = {
-1, // TSDB_DATA_TYPE_NULL
sizeof(int8_t), // TSDB_DATA_TYPE_BOOL
sizeof(int8_t), // TSDB_DATA_TYPE_TINYINT
......@@ -34,6 +34,7 @@ const int32_t TYPE_BYTES[15] = {
sizeof(uint16_t), // TSDB_DATA_TYPE_USMALLINT
sizeof(uint32_t), // TSDB_DATA_TYPE_UINT
sizeof(uint64_t), // TSDB_DATA_TYPE_UBIGINT
sizeof(int8_t), // TSDB_DATA_TYPE_JSON
};
#define DO_STATICS(__sum, __min, __max, __minIndex, __maxIndex, _list, _index) \
......@@ -367,8 +368,8 @@ static void getStatics_nchr(const void *pData, int32_t numOfRow, int64_t *min, i
*maxIndex = 0;
}
tDataTypeDescriptor tDataTypes[15] = {
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", 0, 0, NULL, NULL, NULL},
tDataTypeDescriptor tDataTypes[16] = {
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", 0, 0, NULL, NULL, NULL},
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", false, true, tsCompressBool, tsDecompressBool, getStatics_bool},
{TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", INT8_MIN, INT8_MAX, tsCompressTinyint, tsDecompressTinyint, getStatics_i8},
{TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", INT16_MIN, INT16_MAX, tsCompressSmallint, tsDecompressSmallint, getStatics_i16},
......@@ -376,13 +377,14 @@ tDataTypeDescriptor tDataTypes[15] = {
{TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", INT64_MIN, INT64_MAX, tsCompressBigint, tsDecompressBigint, getStatics_i64},
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", 0, 0, tsCompressFloat, tsDecompressFloat, getStatics_f},
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", 0, 0, tsCompressDouble, tsDecompressDouble, getStatics_d},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", INT64_MIN, INT64_MAX, tsCompressTimestamp, tsDecompressTimestamp, getStatics_i64},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_UTINYINT, 16, CHAR_BYTES, "TINYINT UNSIGNED", 0, UINT8_MAX, tsCompressTinyint, tsDecompressTinyint, getStatics_u8},
{TSDB_DATA_TYPE_USMALLINT, 17, SHORT_BYTES, "SMALLINT UNSIGNED", 0, UINT16_MAX, tsCompressSmallint, tsDecompressSmallint, getStatics_u16},
{TSDB_DATA_TYPE_UINT, 12, INT_BYTES, "INT UNSIGNED", 0, UINT32_MAX, tsCompressInt, tsDecompressInt, getStatics_u32},
{TSDB_DATA_TYPE_UBIGINT, 15, LONG_BYTES, "BIGINT UNSIGNED", 0, UINT64_MAX, tsCompressBigint, tsDecompressBigint, getStatics_u64},
{TSDB_DATA_TYPE_JSON,4, TSDB_MAX_JSON_TAGS_LEN, "JSON", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
};
char tTokenTypeSwitcher[13] = {
......@@ -428,7 +430,7 @@ FORCE_INLINE void* getDataMax(int32_t type) {
bool isValidDataType(int32_t type) {
return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_UBIGINT;
return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_JSON;
}
void setVardataNull(void* val, int32_t type) {
......@@ -438,6 +440,9 @@ void setVardataNull(void* val, int32_t type) {
} else if (type == TSDB_DATA_TYPE_NCHAR) {
varDataSetLen(val, sizeof(int32_t));
*(uint32_t*) varDataVal(val) = TSDB_DATA_NCHAR_NULL;
} else if (type == TSDB_DATA_TYPE_JSON) {
varDataSetLen(val, sizeof(int32_t));
*(uint32_t*) varDataVal(val) = TSDB_DATA_JSON_NULL;
} else {
assert(0);
}
......@@ -505,6 +510,7 @@ void setNullN(void *val, int32_t type, int32_t bytes, int32_t numOfElems) {
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_JSON:
for (int32_t i = 0; i < numOfElems; ++i) {
setVardataNull(POINTER_SHIFT(val, i * bytes), type);
}
......
......@@ -158,7 +158,7 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
pVar->dKey = GET_FLOAT_VAL(pz);
break;
}
case TSDB_DATA_TYPE_NCHAR: { // here we get the nchar length from raw binary bits length
case TSDB_DATA_TYPE_NCHAR:{ // here we get the nchar length from raw binary bits length
size_t lenInwchar = len / TSDB_NCHAR_SIZE;
pVar->wpz = calloc(1, (lenInwchar + 1) * TSDB_NCHAR_SIZE);
......@@ -167,7 +167,13 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
break;
}
case TSDB_DATA_TYPE_BINARY: { // todo refactor, extract a method
case TSDB_DATA_TYPE_JSON:{
pVar->pz = calloc(len + 2, sizeof(char));
memcpy(pVar->pz, pz, len);
pVar->nLen = (int32_t)len;
break;
}
case TSDB_DATA_TYPE_BINARY:{
pVar->pz = calloc(len + 1, sizeof(char));
memcpy(pVar->pz, pz, len);
pVar->nLen = (int32_t)len;
......@@ -185,7 +191,7 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
void tVariantDestroy(tVariant *pVar) {
if (pVar == NULL) return;
if (pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR) {
if (pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR || pVar->nType == TSDB_DATA_TYPE_JSON) {
tfree(pVar->pz);
pVar->nLen = 0;
}
......@@ -210,11 +216,41 @@ bool tVariantIsValid(tVariant *pVar) {
return isValidDataType(pVar->nType);
}
bool tVariantTypeMatch(tVariant *pVar, int8_t dbType){
switch (dbType) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
if(pVar->nType != TSDB_DATA_TYPE_BINARY && pVar->nType != TSDB_DATA_TYPE_NCHAR){
return false;
}
break;
}
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:{
if(pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR){
return false;
}
break;
}
}
return true;
}
void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
if (pSrc == NULL || pDst == NULL) return;
pDst->nType = pSrc->nType;
if (pSrc->nType == TSDB_DATA_TYPE_BINARY || pSrc->nType == TSDB_DATA_TYPE_NCHAR) {
if (pSrc->nType == TSDB_DATA_TYPE_BINARY || pSrc->nType == TSDB_DATA_TYPE_NCHAR || pSrc->nType == TSDB_DATA_TYPE_JSON) {
int32_t len = pSrc->nLen + TSDB_NCHAR_SIZE;
char* p = realloc(pDst->pz, len);
assert(p);
......@@ -249,7 +285,7 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
}
}
if (pDst->nType != TSDB_DATA_TYPE_POINTER_ARRAY && pDst->nType != TSDB_DATA_TYPE_VALUE_ARRAY) {
if (pDst->nType != TSDB_DATA_TYPE_POINTER_ARRAY && pDst->nType != TSDB_DATA_TYPE_VALUE_ARRAY && isValidDataType(pDst->nType)) { // if pDst->nType=-1, core dump. eg: where intcolumn=999999999999999999999999999
pDst->nLen = tDataTypes[pDst->nType].bytes;
}
}
......@@ -267,7 +303,7 @@ int32_t tVariantCompare(const tVariant* p1, const tVariant* p2) {
return 1;
}
if (p1->nType == TSDB_DATA_TYPE_BINARY || p1->nType == TSDB_DATA_TYPE_NCHAR) {
if (p1->nType == TSDB_DATA_TYPE_BINARY || p1->nType == TSDB_DATA_TYPE_NCHAR || p1->nType == TSDB_DATA_TYPE_JSON) {
if (p1->nLen == p2->nLen) {
return memcmp(p1->pz, p2->pz, p1->nLen);
} else {
......@@ -815,7 +851,7 @@ int32_t tVariantDumpEx(tVariant *pVariant, char *payload, int16_t type, bool inc
break;
}
case TSDB_DATA_TYPE_BINARY: {
case TSDB_DATA_TYPE_BINARY:{
if (!includeLengthPrefix) {
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
*(uint8_t*) payload = TSDB_DATA_BINARY_NULL;
......@@ -852,7 +888,7 @@ int32_t tVariantDumpEx(tVariant *pVariant, char *payload, int16_t type, bool inc
}
break;
}
case TSDB_DATA_TYPE_NCHAR: {
case TSDB_DATA_TYPE_NCHAR:{
int32_t newlen = 0;
if (!includeLengthPrefix) {
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
......@@ -888,6 +924,16 @@ int32_t tVariantDumpEx(tVariant *pVariant, char *payload, int16_t type, bool inc
break;
}
case TSDB_DATA_TYPE_JSON:{
if (pVariant->nType == TSDB_DATA_TYPE_BINARY){
*((int8_t *)payload) = TSDB_DATA_JSON_PLACEHOLDER;
} else if (pVariant->nType == TSDB_DATA_TYPE_JSON){ // select * from stable, set tag type to json,from setTagValue/tag_project_function
memcpy(payload, pVariant->pz, pVariant->nLen);
}else {
return -1;
}
break;
}
}
return 0;
......
......@@ -124,6 +124,21 @@ class TaosBind(ctypes.Structure):
self.buffer_length = length
self.length = pointer(c_size_t(self.buffer_length))
def json(self, value):
buffer = None
length = 0
if isinstance(value, str):
bytes = value.encode("utf-8")
buffer = create_string_buffer(bytes)
length = len(bytes)
else:
buffer = value
length = len(value)
self.buffer_type = FieldType.C_JSON
self.buffer = cast(buffer, c_void_p)
self.buffer_length = length
self.length = pointer(c_size_t(self.buffer_length))
def tinyint_unsigned(self, value):
self.buffer_type = FieldType.C_TINYINT_UNSIGNED
self.buffer = cast(pointer(c_uint8(value)), c_void_p)
......@@ -356,6 +371,11 @@ class TaosMultiBind(ctypes.Structure):
self.buffer_type = FieldType.C_NCHAR
self._str_to_buffer(values)
def json(self, values):
# type: (list[str]) -> None
self.buffer_type = FieldType.C_JSON
self._str_to_buffer(values)
def tinyint_unsigned(self, values):
self.buffer_type = FieldType.C_TINYINT_UNSIGNED
self.buffer_length = sizeof(c_uint8)
......
......@@ -25,6 +25,7 @@ class FieldType(object):
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
C_JSON = 15
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
......
......@@ -188,6 +188,9 @@ class TaosCursor(object):
if dataType.upper() == "NCHAR":
if self._description[col][1] == FieldType.C_NCHAR:
return True
if dataType.upper() == "JSON":
if self._description[col][1] == FieldType.C_JSON:
return True
return False
......
......@@ -207,6 +207,7 @@ CONVERT_FUNC = {
FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python,
FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python,
FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python,
FieldType.C_JSON: _crow_nchar_to_python,
}
CONVERT_FUNC_BLOCK = {
......@@ -224,6 +225,7 @@ CONVERT_FUNC_BLOCK = {
FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python,
FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python,
FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python,
FieldType.C_JSON: _crow_nchar_to_python_block,
}
# Corresponding TAOS_FIELD structure in C
......
......@@ -170,7 +170,6 @@ int32_t dnodeInitSystem() {
taosResolveCRC();
taosInitGlobalCfg();
taosReadGlobalLogCfg();
taosSetCoreDump();
dnodeInitTmr();
if (dnodeCreateDir(tsLogDir) < 0) {
......@@ -190,6 +189,7 @@ int32_t dnodeInitSystem() {
return -1;
}
taosSetCoreDump();
dInfo("start to initialize TDengine");
taosInitNotes();
......
......@@ -46,6 +46,7 @@ typedef void **TAOS_ROW;
#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_JSON 15 // json string
typedef enum {
TSDB_OPTION_LOCALE,
......
......@@ -39,7 +39,7 @@ extern "C" {
#define TSKEY_INITIAL_VAL INT64_MIN
// Bytes for each type.
extern const int32_t TYPE_BYTES[15];
extern const int32_t TYPE_BYTES[16];
// TODO: replace and remove code below
#define CHAR_BYTES sizeof(char)
......@@ -70,6 +70,11 @@ extern const int32_t TYPE_BYTES[15];
#define TSDB_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
#define TSDB_DATA_NCHAR_NULL 0xFFFFFFFF
#define TSDB_DATA_BINARY_NULL 0xFF
#define TSDB_DATA_JSON_PLACEHOLDER 0x7F
#define TSDB_DATA_JSON_NULL 0xFFFFFFFF
#define TSDB_DATA_JSON_null 0xFFFFFFFE
#define TSDB_DATA_JSON_NOT_NULL 0x01
#define TSDB_DATA_JSON_CAN_NOT_COMPARE 0x7FFFFFFF
#define TSDB_DATA_UTINYINT_NULL 0xFF
#define TSDB_DATA_USMALLINT_NULL 0xFFFF
......@@ -176,6 +181,9 @@ do { \
#define TSDB_RELATION_MATCH 14
#define TSDB_RELATION_NMATCH 15
#define TSDB_RELATION_CONTAINS 16
#define TSDB_RELATION_ARROW 17
#define TSDB_BINARY_OP_ADD 30
#define TSDB_BINARY_OP_SUBTRACT 31
#define TSDB_BINARY_OP_MULTIPLY 32
......@@ -222,8 +230,11 @@ do { \
*/
#define TSDB_MAX_BYTES_PER_ROW 49151
#define TSDB_MAX_TAGS_LEN 16384
#define TSDB_MAX_JSON_TAGS_LEN (4096*TSDB_NCHAR_SIZE + 2 + 1) // 2->var_header_len 1->type
#define TSDB_MAX_TAGS 128
#define TSDB_MAX_TAG_CONDITIONS 1024
#define TSDB_MAX_JSON_KEY_LEN 256
#define TSDB_MAX_JSON_KEY_MD5_LEN 16
#define TSDB_AUTH_LEN 16
#define TSDB_KEY_LEN 16
......
......@@ -250,48 +250,48 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) //"Invalid tsdb state"
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID"
#define TSDB_CODE_TDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0601) //"Invalid table type"
#define TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0602) //"Invalid table schema version"
#define TSDB_CODE_TDB_TABLE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0603) //"Table already exists"
#define TSDB_CODE_TDB_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0604) //"Invalid configuration"
#define TSDB_CODE_TDB_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0605) //"Tsdb init failed"
#define TSDB_CODE_TDB_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0606) //"No diskspace for tsdb"
#define TSDB_CODE_TDB_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0607) //"No permission for disk files"
#define TSDB_CODE_TDB_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0608) //"Data file(s) corrupted"
#define TSDB_CODE_TDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0609) //"Out of memory"
#define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060A) //"Tag too old"
#define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060B) //"Timestamp data out of range"
#define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060C) //"Submit message is messed up"
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x060D) //"Invalid operation"
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x060E) //"Invalid creation of table"
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x060F) //"No table data in memory skiplist"
#define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0610) //"File already exists"
#define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0611) //"Need to reconfigure table"
#define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0612) //"Invalid information to create table"
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0613) //"No available disk"
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) //"TSDB messed message"
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) //"TSDB invalid tag value"
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) //"TSDB no cache last row data"
#define TSDB_CODE_TDB_INCOMPLETE_DFILESET TAOS_DEF_ERROR_CODE(0, 0x0617) //"TSDB incomplete DFileSet"
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID")
#define TSDB_CODE_TDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0601) //"Invalid table type")
#define TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0602) //"Invalid table schema version")
#define TSDB_CODE_TDB_TABLE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0603) //"Table already exists")
#define TSDB_CODE_TDB_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0604) //"Invalid configuration")
#define TSDB_CODE_TDB_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0605) //"Tsdb init failed")
#define TSDB_CODE_TDB_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0606) //"No diskspace for tsdb")
#define TSDB_CODE_TDB_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0607) //"No permission for disk files")
#define TSDB_CODE_TDB_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0608) //"Data file(s) corrupted")
#define TSDB_CODE_TDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0609) //"Out of memory")
#define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060A) //"Tag too old")
#define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060B) //"Timestamp data out of range")
#define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060C) //"Submit message is messed up")
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x060D) //"Invalid operation")
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x060E) //"Invalid creation of table")
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x060F) //"No table data in memory skiplist")
#define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0610) //"File already exists")
#define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0611) //"Need to reconfigure table")
#define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0612) //"Invalid information to create table")
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0613) //"No available disk")
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) //"TSDB messed message")
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) //"TSDB invalid tag value")
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) //"TSDB no cache last row data")
#define TSDB_CODE_TDB_INCOMPLETE_DFILESET TAOS_DEF_ERROR_CODE(0, 0x0617) //"TSDB incomplete DFileSet")
#define TSDB_CODE_TDB_NO_JSON_TAG_KEY TAOS_DEF_ERROR_CODE(0, 0x0618) //"TSDB no tag json key")
// query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) //"Invalid handle"
#define TSDB_CODE_QRY_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0701) //"Invalid message" // failed to validate the sql expression msg by vnode
#define TSDB_CODE_QRY_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0702) //"No diskspace for query"
#define TSDB_CODE_QRY_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0703) //"System out of memory"
#define TSDB_CODE_QRY_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0704) //"Unexpected generic error in query"
#define TSDB_CODE_QRY_DUP_JOIN_KEY TAOS_DEF_ERROR_CODE(0, 0x0705) //"Duplicated join key"
#define TSDB_CODE_QRY_EXCEED_TAGS_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0706) //"Tag condition too many"
#define TSDB_CODE_QRY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0707) //"Query not ready"
#define TSDB_CODE_QRY_HAS_RSP TAOS_DEF_ERROR_CODE(0, 0x0708) //"Query should response"
#define TSDB_CODE_QRY_IN_EXEC TAOS_DEF_ERROR_CODE(0, 0x0709) //"Multiple retrieval of this query"
#define TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW TAOS_DEF_ERROR_CODE(0, 0x070A) //"Too many time window in query"
#define TSDB_CODE_QRY_NOT_ENOUGH_BUFFER TAOS_DEF_ERROR_CODE(0, 0x070B) //"Query buffer limit has reached"
#define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica"
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error"
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition"
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) //"Invalid handle")
#define TSDB_CODE_QRY_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0701) //"Invalid message") // failed to validate the sql expression msg by vnode
#define TSDB_CODE_QRY_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0702) //"No diskspace for query")
#define TSDB_CODE_QRY_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0703) //"System out of memory")
#define TSDB_CODE_QRY_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0704) //"Unexpected generic error in query")
#define TSDB_CODE_QRY_DUP_JOIN_KEY TAOS_DEF_ERROR_CODE(0, 0x0705) //"Duplicated join key")
#define TSDB_CODE_QRY_EXCEED_TAGS_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0706) //"Tag condition too many")
#define TSDB_CODE_QRY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0707) //"Query not ready")
#define TSDB_CODE_QRY_HAS_RSP TAOS_DEF_ERROR_CODE(0, 0x0708) //"Query should response")
#define TSDB_CODE_QRY_IN_EXEC TAOS_DEF_ERROR_CODE(0, 0x0709) //"Multiple retrieval of this query")
#define TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW TAOS_DEF_ERROR_CODE(0, 0x070A) //"Too many time window in query")
#define TSDB_CODE_QRY_NOT_ENOUGH_BUFFER TAOS_DEF_ERROR_CODE(0, 0x070B) //"Query buffer limit has reached")
#define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica")
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error")
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition")
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired"
......
......@@ -412,7 +412,7 @@ typedef struct SColIndex {
int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
uint16_t flag; // denote if it is a tag or a normal column
char name[TSDB_COL_NAME_LEN + TSDB_TABLE_NAME_LEN + 1];
char name[TSDB_COL_NAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_MAX_JSON_KEY_LEN + 4 + 1]; // 4 meams ->'' for json tag
} SColIndex;
typedef struct SColumnFilterInfo {
......
......@@ -118,7 +118,7 @@ typedef struct {
void tsdbClearTableCfg(STableCfg *config);
void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type, int16_t bytes);
void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type);
char *tsdbGetTableName(void *pTable);
#define TSDB_TABLEID(_table) ((STableId*) (_table))
......@@ -418,10 +418,14 @@ int tsdbCompact(STsdbRepo *pRepo);
// no problem return true
bool tsdbNoProblem(STsdbRepo* pRepo);
// unit of walSize: MB
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize);
// for json tag
void* getJsonTagValueElment(void* data, char* key, int32_t keyLen, char* out, int16_t bytes);
void getJsonTagValueAll(void* data, void* dst, int16_t bytes);
char* parseTagDatatoJson(void *p);
#ifdef __cplusplus
}
#endif
......
......@@ -28,195 +28,195 @@
#define TK_TIMESTAMP 10
#define TK_BINARY 11
#define TK_NCHAR 12
#define TK_OR 13
#define TK_AND 14
#define TK_NOT 15
#define TK_EQ 16
#define TK_NE 17
#define TK_ISNULL 18
#define TK_NOTNULL 19
#define TK_IS 20
#define TK_LIKE 21
#define TK_MATCH 22
#define TK_NMATCH 23
#define TK_GLOB 24
#define TK_BETWEEN 25
#define TK_IN 26
#define TK_GT 27
#define TK_GE 28
#define TK_LT 29
#define TK_LE 30
#define TK_BITAND 31
#define TK_BITOR 32
#define TK_LSHIFT 33
#define TK_RSHIFT 34
#define TK_PLUS 35
#define TK_MINUS 36
#define TK_DIVIDE 37
#define TK_TIMES 38
#define TK_STAR 39
#define TK_SLASH 40
#define TK_REM 41
#define TK_CONCAT 42
#define TK_UMINUS 43
#define TK_UPLUS 44
#define TK_BITNOT 45
#define TK_SHOW 46
#define TK_DATABASES 47
#define TK_TOPICS 48
#define TK_FUNCTIONS 49
#define TK_MNODES 50
#define TK_DNODES 51
#define TK_ACCOUNTS 52
#define TK_USERS 53
#define TK_MODULES 54
#define TK_QUERIES 55
#define TK_CONNECTIONS 56
#define TK_STREAMS 57
#define TK_VARIABLES 58
#define TK_SCORES 59
#define TK_GRANTS 60
#define TK_VNODES 61
#define TK_DOT 62
#define TK_CREATE 63
#define TK_TABLE 64
#define TK_STABLE 65
#define TK_DATABASE 66
#define TK_TABLES 67
#define TK_STABLES 68
#define TK_VGROUPS 69
#define TK_DROP 70
#define TK_TOPIC 71
#define TK_FUNCTION 72
#define TK_DNODE 73
#define TK_USER 74
#define TK_ACCOUNT 75
#define TK_USE 76
#define TK_DESCRIBE 77
#define TK_DESC 78
#define TK_ALTER 79
#define TK_PASS 80
#define TK_PRIVILEGE 81
#define TK_LOCAL 82
#define TK_COMPACT 83
#define TK_LP 84
#define TK_RP 85
#define TK_IF 86
#define TK_EXISTS 87
#define TK_AS 88
#define TK_OUTPUTTYPE 89
#define TK_AGGREGATE 90
#define TK_BUFSIZE 91
#define TK_PPS 92
#define TK_TSERIES 93
#define TK_DBS 94
#define TK_STORAGE 95
#define TK_QTIME 96
#define TK_CONNS 97
#define TK_STATE 98
#define TK_COMMA 99
#define TK_KEEP 100
#define TK_CACHE 101
#define TK_REPLICA 102
#define TK_QUORUM 103
#define TK_DAYS 104
#define TK_MINROWS 105
#define TK_MAXROWS 106
#define TK_BLOCKS 107
#define TK_CTIME 108
#define TK_WAL 109
#define TK_FSYNC 110
#define TK_COMP 111
#define TK_PRECISION 112
#define TK_UPDATE 113
#define TK_CACHELAST 114
#define TK_PARTITIONS 115
#define TK_UNSIGNED 116
#define TK_TAGS 117
#define TK_USING 118
#define TK_NULL 119
#define TK_NOW 120
#define TK_SELECT 121
#define TK_UNION 122
#define TK_ALL 123
#define TK_DISTINCT 124
#define TK_FROM 125
#define TK_VARIABLE 126
#define TK_RANGE 127
#define TK_INTERVAL 128
#define TK_EVERY 129
#define TK_SESSION 130
#define TK_STATE_WINDOW 131
#define TK_FILL 132
#define TK_SLIDING 133
#define TK_ORDER 134
#define TK_BY 135
#define TK_ASC 136
#define TK_GROUP 137
#define TK_HAVING 138
#define TK_LIMIT 139
#define TK_OFFSET 140
#define TK_SLIMIT 141
#define TK_SOFFSET 142
#define TK_WHERE 143
#define TK_RESET 144
#define TK_QUERY 145
#define TK_SYNCDB 146
#define TK_ADD 147
#define TK_COLUMN 148
#define TK_MODIFY 149
#define TK_TAG 150
#define TK_CHANGE 151
#define TK_SET 152
#define TK_KILL 153
#define TK_CONNECTION 154
#define TK_STREAM 155
#define TK_COLON 156
#define TK_ABORT 157
#define TK_AFTER 158
#define TK_ATTACH 159
#define TK_BEFORE 160
#define TK_BEGIN 161
#define TK_CASCADE 162
#define TK_CLUSTER 163
#define TK_CONFLICT 164
#define TK_COPY 165
#define TK_DEFERRED 166
#define TK_DELIMITERS 167
#define TK_DETACH 168
#define TK_EACH 169
#define TK_END 170
#define TK_EXPLAIN 171
#define TK_FAIL 172
#define TK_FOR 173
#define TK_IGNORE 174
#define TK_IMMEDIATE 175
#define TK_INITIALLY 176
#define TK_INSTEAD 177
#define TK_KEY 178
#define TK_OF 179
#define TK_RAISE 180
#define TK_REPLACE 181
#define TK_RESTRICT 182
#define TK_ROW 183
#define TK_STATEMENT 184
#define TK_TRIGGER 185
#define TK_VIEW 186
#define TK_IPTOKEN 187
#define TK_SEMI 188
#define TK_NONE 189
#define TK_PREV 190
#define TK_LINEAR 191
#define TK_IMPORT 192
#define TK_TBNAME 193
#define TK_JOIN 194
#define TK_INSERT 195
#define TK_INTO 196
#define TK_VALUES 197
#define TK_FILE 198
#define TK_JSON 13
#define TK_OR 14
#define TK_AND 15
#define TK_NOT 16
#define TK_EQ 17
#define TK_NE 18
#define TK_ISNULL 19
#define TK_NOTNULL 20
#define TK_IS 21
#define TK_LIKE 22
#define TK_MATCH 23
#define TK_NMATCH 24
#define TK_CONTAINS 25
#define TK_GLOB 26
#define TK_BETWEEN 27
#define TK_IN 28
#define TK_GT 29
#define TK_GE 30
#define TK_LT 31
#define TK_LE 32
#define TK_BITAND 33
#define TK_BITOR 34
#define TK_LSHIFT 35
#define TK_RSHIFT 36
#define TK_PLUS 37
#define TK_MINUS 38
#define TK_DIVIDE 39
#define TK_TIMES 40
#define TK_STAR 41
#define TK_SLASH 42
#define TK_REM 43
#define TK_CONCAT 44
#define TK_UMINUS 45
#define TK_UPLUS 46
#define TK_BITNOT 47
#define TK_ARROW 48
#define TK_SHOW 49
#define TK_DATABASES 50
#define TK_TOPICS 51
#define TK_FUNCTIONS 52
#define TK_MNODES 53
#define TK_DNODES 54
#define TK_ACCOUNTS 55
#define TK_USERS 56
#define TK_MODULES 57
#define TK_QUERIES 58
#define TK_CONNECTIONS 59
#define TK_STREAMS 60
#define TK_VARIABLES 61
#define TK_SCORES 62
#define TK_GRANTS 63
#define TK_VNODES 64
#define TK_DOT 65
#define TK_CREATE 66
#define TK_TABLE 67
#define TK_STABLE 68
#define TK_DATABASE 69
#define TK_TABLES 70
#define TK_STABLES 71
#define TK_VGROUPS 72
#define TK_DROP 73
#define TK_TOPIC 74
#define TK_FUNCTION 75
#define TK_DNODE 76
#define TK_USER 77
#define TK_ACCOUNT 78
#define TK_USE 79
#define TK_DESCRIBE 80
#define TK_DESC 81
#define TK_ALTER 82
#define TK_PASS 83
#define TK_PRIVILEGE 84
#define TK_LOCAL 85
#define TK_COMPACT 86
#define TK_LP 87
#define TK_RP 88
#define TK_IF 89
#define TK_EXISTS 90
#define TK_AS 91
#define TK_OUTPUTTYPE 92
#define TK_AGGREGATE 93
#define TK_BUFSIZE 94
#define TK_PPS 95
#define TK_TSERIES 96
#define TK_DBS 97
#define TK_STORAGE 98
#define TK_QTIME 99
#define TK_CONNS 100
#define TK_STATE 101
#define TK_COMMA 102
#define TK_KEEP 103
#define TK_CACHE 104
#define TK_REPLICA 105
#define TK_QUORUM 106
#define TK_DAYS 107
#define TK_MINROWS 108
#define TK_MAXROWS 109
#define TK_BLOCKS 110
#define TK_CTIME 111
#define TK_WAL 112
#define TK_FSYNC 113
#define TK_COMP 114
#define TK_PRECISION 115
#define TK_UPDATE 116
#define TK_CACHELAST 117
#define TK_PARTITIONS 118
#define TK_UNSIGNED 119
#define TK_TAGS 120
#define TK_USING 121
#define TK_NULL 122
#define TK_NOW 123
#define TK_SELECT 124
#define TK_UNION 125
#define TK_ALL 126
#define TK_DISTINCT 127
#define TK_FROM 128
#define TK_VARIABLE 129
#define TK_RANGE 130
#define TK_INTERVAL 131
#define TK_EVERY 132
#define TK_SESSION 133
#define TK_STATE_WINDOW 134
#define TK_FILL 135
#define TK_SLIDING 136
#define TK_ORDER 137
#define TK_BY 138
#define TK_ASC 139
#define TK_GROUP 140
#define TK_HAVING 141
#define TK_LIMIT 142
#define TK_OFFSET 143
#define TK_SLIMIT 144
#define TK_SOFFSET 145
#define TK_WHERE 146
#define TK_RESET 147
#define TK_QUERY 148
#define TK_SYNCDB 149
#define TK_ADD 150
#define TK_COLUMN 151
#define TK_MODIFY 152
#define TK_TAG 153
#define TK_CHANGE 154
#define TK_SET 155
#define TK_KILL 156
#define TK_CONNECTION 157
#define TK_STREAM 158
#define TK_COLON 159
#define TK_ABORT 160
#define TK_AFTER 161
#define TK_ATTACH 162
#define TK_BEFORE 163
#define TK_BEGIN 164
#define TK_CASCADE 165
#define TK_CLUSTER 166
#define TK_CONFLICT 167
#define TK_COPY 168
#define TK_DEFERRED 169
#define TK_DELIMITERS 170
#define TK_DETACH 171
#define TK_EACH 172
#define TK_END 173
#define TK_EXPLAIN 174
#define TK_FAIL 175
#define TK_FOR 176
#define TK_IGNORE 177
#define TK_IMMEDIATE 178
#define TK_INITIALLY 179
#define TK_INSTEAD 180
#define TK_KEY 181
#define TK_OF 182
#define TK_RAISE 183
#define TK_REPLACE 184
#define TK_RESTRICT 185
#define TK_ROW 186
#define TK_STATEMENT 187
#define TK_TRIGGER 188
#define TK_VIEW 189
#define TK_IPTOKEN 190
#define TK_SEMI 191
#define TK_NONE 192
#define TK_PREV 193
#define TK_LINEAR 194
#define TK_IMPORT 195
#define TK_TBNAME 196
#define TK_JOIN 197
#define TK_INSERT 198
#define TK_INTO 199
#define TK_VALUES 200
#define TK_FILE 201
#define TK_SPACE 300
......
......@@ -200,6 +200,8 @@ static FORCE_INLINE bool isNull(const void *val, int32_t type) {
return *(uint32_t *)val == TSDB_DATA_FLOAT_NULL;
case TSDB_DATA_TYPE_DOUBLE:
return *(uint64_t *)val == TSDB_DATA_DOUBLE_NULL;
case TSDB_DATA_TYPE_JSON:
return varDataLen(val) == sizeof(int32_t) && *(uint32_t *) varDataVal(val) == TSDB_DATA_JSON_NULL;
case TSDB_DATA_TYPE_NCHAR:
return varDataLen(val) == sizeof(int32_t) && *(uint32_t*) varDataVal(val) == TSDB_DATA_NCHAR_NULL;
case TSDB_DATA_TYPE_BINARY:
......@@ -230,10 +232,10 @@ typedef struct tDataTypeDescriptor {
int (*decompFunc)(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize);
void (*statisFunc)(const void *pData, int32_t numofrow, int64_t *min, int64_t *max, int64_t *sum,
int16_t *minindex, int16_t *maxindex, int16_t *numofnull);
int16_t *minindex, int16_t *maxindex, int16_t *numofnull);
} tDataTypeDescriptor;
extern tDataTypeDescriptor tDataTypes[15];
extern tDataTypeDescriptor tDataTypes[16];
bool isValidDataType(int32_t type);
......
......@@ -509,6 +509,7 @@ static void dumpFieldToFile(FILE* fp, const char* val, TAOS_FIELD* field, int32_
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
memcpy(buf, val, length);
buf[length] = 0;
fprintf(fp, "\'%s\'", buf);
......@@ -692,6 +693,7 @@ static void printField(const char* val, TAOS_FIELD* field, int width, int32_t le
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
shellPrintNChar(val, length, width);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
......@@ -805,7 +807,8 @@ static int calcColWidth(TAOS_FIELD* field, int precision) {
return MAX(field->bytes, width);
}
case TSDB_DATA_TYPE_NCHAR: {
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:{
int16_t bytes = field->bytes * TSDB_NCHAR_SIZE;
if (bytes > tsMaxBinaryDisplayWidth) {
return MAX(tsMaxBinaryDisplayWidth, width);
......
......@@ -21,7 +21,6 @@
#include "tlog.h"
#include "ttimer.h"
#include "tutil.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "dnode.h"
#include "vnode.h"
......
Subproject commit f108f5240918d0eec90debd1ff469c98ff0f25ac
Subproject commit 88346a2e4e2e9282d2ec8b8c5264ca1ec23698a1
......@@ -204,7 +204,7 @@ typedef struct SQLFunctionCtx {
SResultRowCellInfo *resultInfo;
int16_t colId;
int16_t colId; // used for user-specified constant value
SExtTagsInfo tagInfo;
SPoint1 start;
SPoint1 end;
......
......@@ -105,6 +105,7 @@ typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const v
typedef int32_t(*filter_desc_compare_func)(const void *, const void *);
typedef bool(*filter_exec_func)(void *, int32_t, int8_t**, SDataStatis *, int16_t);
typedef int32_t (*filer_get_col_from_id)(void *, int32_t, void **);
typedef int32_t (*filer_get_col_from_name)(void *, int32_t, char*, void **);
typedef struct SFilterRangeCompare {
int64_t s;
......@@ -237,11 +238,12 @@ typedef struct SFilterInfo {
uint32_t blkGroupNum;
uint32_t *blkUnits;
int8_t *blkUnitRes;
void *pTable;
SFilterPCtx pctx;
} SFilterInfo;
#define FILTER_NO_MERGE_DATA_TYPE(t) ((t) == TSDB_DATA_TYPE_BINARY || (t) == TSDB_DATA_TYPE_NCHAR)
#define FILTER_NO_MERGE_DATA_TYPE(t) ((t) == TSDB_DATA_TYPE_BINARY || (t) == TSDB_DATA_TYPE_NCHAR || (t) == TSDB_DATA_TYPE_JSON)
#define FILTER_NO_MERGE_OPTR(o) ((o) == TSDB_RELATION_ISNULL || (o) == TSDB_RELATION_NOTNULL || (o) == FILTER_DUMMY_EMPTY_OPTR)
#define MR_EMPTY_RES(ctx) (ctx->rs == NULL)
......@@ -286,6 +288,7 @@ typedef struct SFilterInfo {
#define FILTER_GET_COL_FIELD_DATA(fi, ri) ((char *)(fi)->data + ((SSchema *)((fi)->desc))->bytes * (ri))
#define FILTER_GET_VAL_FIELD_TYPE(fi) (((tVariant *)((fi)->desc))->nType)
#define FILTER_GET_VAL_FIELD_DATA(fi) ((char *)(fi)->data)
#define FILTER_GET_JSON_VAL_FIELD_DATA(fi) ((char *)(fi)->desc)
#define FILTER_GET_TYPE(fl) ((fl) & FLD_TYPE_MAX)
#define FILTER_GROUP_UNIT(i, g, uid) ((i)->units + (g)->unitIdxs[uid])
......@@ -298,6 +301,7 @@ typedef struct SFilterInfo {
#define FILTER_UNIT_COL_SIZE(i, u) FILTER_GET_COL_FIELD_SIZE(FILTER_UNIT_LEFT_FIELD(i, u))
#define FILTER_UNIT_COL_ID(i, u) FILTER_GET_COL_FIELD_ID(FILTER_UNIT_LEFT_FIELD(i, u))
#define FILTER_UNIT_VAL_DATA(i, u) FILTER_GET_VAL_FIELD_DATA(FILTER_UNIT_RIGHT_FIELD(i, u))
#define FILTER_UNIT_JSON_VAL_DATA(i, u) FILTER_GET_JSON_VAL_FIELD_DATA(FILTER_UNIT_RIGHT_FIELD(i, u))
#define FILTER_UNIT_COL_IDX(u) ((u)->left.idx)
#define FILTER_UNIT_OPTR(u) ((u)->compare.optr)
#define FILTER_UNIT_COMP_FUNC(u) ((u)->compare.func)
......@@ -324,6 +328,7 @@ typedef struct SFilterInfo {
extern int32_t filterInitFromTree(tExprNode* tree, void **pinfo, uint32_t options);
extern bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t** p, SDataStatis *statis, int16_t numOfCols);
extern int32_t filterSetColFieldData(SFilterInfo *info, void *param, filer_get_col_from_id fp);
extern int32_t filterSetJsonColFieldData(SFilterInfo *info, void *param, filer_get_col_from_name fp);
extern int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win);
extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);
......
......@@ -79,6 +79,15 @@ typedef struct tVariantListItem {
uint8_t sortOrder;
} tVariantListItem;
typedef struct CommonItem {
union {
tVariant pVar;
struct tSqlExpr *jsonExp;
};
bool isJsonExp;
uint8_t sortOrder;
} CommonItem;
typedef struct SIntervalVal {
int32_t token;
SStrToken interval;
......@@ -161,7 +170,6 @@ typedef struct SAlterTableInfo {
SStrToken name;
int16_t tableType;
int16_t type;
STagData tagData;
SArray *pAddColumns; // SArray<TAOS_FIELD>
SArray *varList; // set t=val or: change src dst, SArray<tVariantListItem>
} SAlterTableInfo;
......@@ -278,6 +286,7 @@ typedef struct tSqlExprItem {
bool distinct;
} tSqlExprItem;
SArray *commonItemAppend(SArray *pList, tVariant *pVar, tSqlExpr *jsonExp, bool isJsonExp, uint8_t sortOrder);
SArray *tVariantListAppend(SArray *pList, tVariant *pVar, uint8_t sortOrder);
SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int32_t index);
......
......@@ -28,6 +28,7 @@ typedef struct STblCond {
typedef struct SJoinNode {
uint64_t uid;
int16_t tagColId;
char tagJsonKeyName[TSDB_MAX_JSON_KEY_LEN + 1]; // for tag json key
SArray* tsJoin;
SArray* tagJoin;
} SJoinNode;
......@@ -165,6 +166,7 @@ typedef struct SQueryInfo {
bool stateWindow;
bool globalMerge;
bool multigroupResult;
bool isStddev;
} SQueryInfo;
/**
......
......@@ -106,5 +106,4 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t* offset);
int32_t initUdfInfo(SUdfInfo* pUdfInfo);
#endif // TDENGINE_QUERYUTIL_H
......@@ -6,12 +6,12 @@
%default_type {SStrToken}
%extra_argument {SSqlInfo* pInfo}
%fallback ID BOOL TINYINT SMALLINT INTEGER BIGINT FLOAT DOUBLE STRING TIMESTAMP BINARY NCHAR.
%fallback ID BOOL TINYINT SMALLINT INTEGER BIGINT FLOAT DOUBLE STRING TIMESTAMP BINARY NCHAR JSON.
%left OR.
%left AND.
%right NOT.
%left EQ NE ISNULL NOTNULL IS LIKE MATCH NMATCH GLOB BETWEEN IN.
%left EQ NE ISNULL NOTNULL IS LIKE MATCH NMATCH CONTAINS GLOB BETWEEN IN.
%left GT GE LT LE.
%left BITAND BITOR LSHIFT RSHIFT.
%left PLUS MINUS.
......@@ -19,6 +19,7 @@
%left STAR SLASH REM.
%left CONCAT.
%right UMINUS UPLUS BITNOT.
%right ARROW.
%include {
#include <stdio.h>
......@@ -630,25 +631,33 @@ sliding_opt(K) ::= . {K.n = 0; K.z = NULL; K.type = 0
%type sortlist {SArray*}
%destructor sortlist {taosArrayDestroy($$);}
%type sortitem {tVariant}
%destructor sortitem {tVariantDestroy(&$$);}
orderby_opt(A) ::= . {A = 0;}
orderby_opt(A) ::= ORDER BY sortlist(X). {A = X;}
sortlist(A) ::= sortlist(X) COMMA item(Y) sortorder(Z). {
A = tVariantListAppend(X, &Y, Z);
A = commonItemAppend(X, &Y, NULL, false, Z);
}
sortlist(A) ::= sortlist(X) COMMA arrow(Y) sortorder(Z). {
A = commonItemAppend(X, NULL, Y, true, Z);
}
sortlist(A) ::= item(Y) sortorder(Z). {
A = tVariantListAppend(NULL, &Y, Z);
A = commonItemAppend(NULL, &Y, NULL, false, Z);
}
sortlist(A) ::= arrow(Y) sortorder(Z). {
A = commonItemAppend(NULL, NULL, Y, true, Z);
}
%type item {tVariant}
item(A) ::= ids(X) cpxName(Y). {
item(A) ::= ID(X). {
toTSDBType(X.type);
X.n += Y.n;
tVariantCreate(&A, &X, true);
}
item(A) ::= ID(X) DOT ID(Y). {
toTSDBType(X.type);
X.n += (1+Y.n);
tVariantCreate(&A, &X, true);
}
......@@ -667,11 +676,19 @@ groupby_opt(A) ::= . { A = 0;}
groupby_opt(A) ::= GROUP BY grouplist(X). { A = X;}
grouplist(A) ::= grouplist(X) COMMA item(Y). {
A = tVariantListAppend(X, &Y, -1);
A = commonItemAppend(X, &Y, NULL, false, -1);
}
grouplist(A) ::= grouplist(X) COMMA arrow(Y). {
A = commonItemAppend(X, NULL, Y, true, -1);
}
grouplist(A) ::= item(X). {
A = tVariantListAppend(NULL, &X, -1);
A = commonItemAppend(NULL, &X, NULL, false, -1);
}
grouplist(A) ::= arrow(X). {
A = commonItemAppend(NULL, NULL, X, true, -1);
}
//having clause, ignore the input condition in having
......@@ -765,6 +782,18 @@ expr(A) ::= expr(X) LIKE expr(Y). {A = tSqlExprCreate(X, Y, TK_LIKE); }
expr(A) ::= expr(X) MATCH expr(Y). {A = tSqlExprCreate(X, Y, TK_MATCH); }
expr(A) ::= expr(X) NMATCH expr(Y). {A = tSqlExprCreate(X, Y, TK_NMATCH); }
// contains expression
expr(A) ::= ID(X) CONTAINS STRING(Y). { tSqlExpr* S = tSqlExprCreateIdValue(pInfo, &X, TK_ID); tSqlExpr* M = tSqlExprCreateIdValue(pInfo, &Y, TK_STRING); A = tSqlExprCreate(S, M, TK_CONTAINS); }
expr(A) ::= ID(X) DOT ID(Y) CONTAINS STRING(Z). { X.n += (1+Y.n); tSqlExpr* S = tSqlExprCreateIdValue(pInfo, &X, TK_ID); tSqlExpr* M = tSqlExprCreateIdValue(pInfo, &Z, TK_STRING); A = tSqlExprCreate(S, M, TK_CONTAINS); }
// arrow expression
%type arrow {tSqlExpr*}
%destructor arrow {tSqlExprDestroy($$);}
arrow(A) ::= ID(X) ARROW STRING(Y). {tSqlExpr* S = tSqlExprCreateIdValue(pInfo, &X, TK_ID); tSqlExpr* M = tSqlExprCreateIdValue(pInfo, &Y, TK_STRING); A = tSqlExprCreate(S, M, TK_ARROW); }
arrow(A) ::= ID(X) DOT ID(Y) ARROW STRING(Z). {X.n += (1+Y.n); tSqlExpr* S = tSqlExprCreateIdValue(pInfo, &X, TK_ID); tSqlExpr* M = tSqlExprCreateIdValue(pInfo, &Z, TK_STRING); A = tSqlExprCreate(S, M, TK_ARROW); }
expr(A) ::= arrow(X). {A = X;}
//in expression
expr(A) ::= expr(X) IN LP exprlist(Y) RP. {A = tSqlExprCreate(X, (tSqlExpr*)Y, TK_IN); }
......
......@@ -2929,8 +2929,7 @@ static void date_col_output_function(SQLFunctionCtx *pCtx) {
}
static void col_project_function(SQLFunctionCtx *pCtx) {
// the number of output rows should not affect the final number of rows, so set it to be 0
if (pCtx->numOfParams == 2) {
if (pCtx->colId <= TSDB_UD_COLUMN_INDEX && pCtx->colId > TSDB_RES_COL_ID) { // user-specified constant value
return;
}
......@@ -2964,6 +2963,7 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
assert(pCtx->inputBytes == pCtx->outputBytes);
tVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->outputType, true);
char* data = pCtx->pOutput;
pCtx->pOutput += pCtx->outputBytes;
......
......@@ -30,6 +30,9 @@
#include "tscompression.h"
#include "qScript.h"
#include "tscLog.h"
#include "cJSON.h"
#include "tsdbMeta.h"
#include "tscUtil.h"
#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN)
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
......@@ -134,6 +137,14 @@ do { \
} \
} while (0)
#define GET_JSON_KEY(exprInfo) \
char* param = NULL; \
int32_t paramLen = 0; \
if(exprInfo->base.numOfParams > 0){ \
param = exprInfo->base.param[0].pz; \
paramLen = exprInfo->base.param[0].nLen; \
}
uint64_t queryHandleId = 0;
int32_t getMaximumIdleDurationSec() {
......@@ -1205,7 +1216,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type);
for(int32_t j = 0; j < pBlock->info.rows; ++j) {
char* dst = p->pData + j * p->info.bytes;
tVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true);
tVariantDump(&pOperator->pExpr[i].base.param[0], dst, p->info.type, true);
}
}
}
......@@ -1445,33 +1456,34 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
}
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)pOperatorInfo->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
int32_t prevIndex = pResultRowInfo->curPos;
TSKEY* tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
tsCols = (int64_t*) pColDataInfo->pData;
tsCols = (int64_t*)pColDataInfo->pData;
assert(tsCols[0] == pSDataBlock->info.window.skey &&
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
}
int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows);
int32_t startPos = ascQuery ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows);
STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SResultRow* pResult = NULL;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset);
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult,
tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1487,7 +1499,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
SResultRow* pRes = getResultRow(pResultRowInfo, j);
if (pRes->closed) {
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) &&
resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
continue;
}
......@@ -1509,8 +1522,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
}
// restore current time window
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset);
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult,
tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1529,8 +1542,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
}
// null data, failed to allocate more memory buffer
int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &nextWin, masterScan, &pResult, tableGroupId,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &nextWin, masterScan,
&pResult, tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1540,20 +1553,18 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &nextWin, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows,
numOfOutput);
}
if (pQueryAttr->timeWindowInterpo) {
int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0;
int32_t rowIndex = ascQuery ? (pSDataBlock->info.rows - 1) : 0;
saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex);
}
updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
}
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
......@@ -1902,6 +1913,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->ptsOutputBuf = NULL;
pCtx->colId = pIndex->colId;
pCtx->outputBytes = pSqlExpr->resBytes;
pCtx->outputType = pSqlExpr->resType;
......@@ -2158,7 +2170,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
case OP_MultiwayMergeSort: {
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger);
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 200, merger); // TD-10899
break;
}
......@@ -2931,7 +2943,7 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock
static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId);
static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes);
static void doSetTagValueInParam(void* pTable, char* param, int32_t paraLen, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes);
static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
SQLFunctionCtx* pCtx = pTableScanInfo->pCtx;
......@@ -3008,19 +3020,22 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
if (pQueryAttr->stableQuery) { // todo refactor
SExprInfo* pExprInfo = &pTableScanInfo->pExpr[0];
int16_t tagId = (int16_t)pExprInfo->base.param[0].i64;
int16_t tagId = (int16_t)pExprInfo->base.param[1].i64;
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagId);
// compare tag first
tVariant t = {0};
doSetTagValueInParam(pRuntimeEnv->current->pTable, tagId, &t, pColInfo->type, pColInfo->bytes);
GET_JSON_KEY(pExprInfo)
doSetTagValueInParam(pRuntimeEnv->current->pTable, param, paramLen, tagId, &t, pColInfo->type, pColInfo->bytes);
setTimestampListJoinInfo(pRuntimeEnv, &t, pRuntimeEnv->current);
STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf);
if (!tsBufIsValidElem(&elem) || (tsBufIsValidElem(&elem) && (tVariantCompare(&t, elem.tag) != 0))) {
(*status) = BLK_DATA_DISCARD;
tVariantDestroy(&t);
return TSDB_CODE_SUCCESS;
}
tVariantDestroy(&t);
}
}
......@@ -3208,7 +3223,7 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
* set tag value in SQLFunctionCtx
* e.g.,tag information into input buffer
*/
static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes) {
static void doSetTagValueInParam(void* pTable, char* param, int32_t paramLen, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes) {
tVariantDestroy(tag);
char* val = NULL;
......@@ -3216,7 +3231,7 @@ static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag,
val = tsdbGetTableName(pTable);
assert(val != NULL);
} else {
val = tsdbGetTableTagVal(pTable, tagColId, type, bytes);
val = tsdbGetTableTagVal(pTable, tagColId, type);
}
if (val == NULL || isNull(val, type)) {
......@@ -3224,11 +3239,19 @@ static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag,
return;
}
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
if (IS_VAR_DATA_TYPE(type)) {
int32_t maxLen = bytes - VARSTR_HEADER_SIZE;
int32_t len = (varDataLen(val) > maxLen)? maxLen:varDataLen(val);
tVariantCreateFromBinary(tag, varDataVal(val), len, type);
//tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), type);
} else if(type == TSDB_DATA_TYPE_JSON){
char jsonVal[TSDB_MAX_JSON_TAGS_LEN] = {0};
if(param){
getJsonTagValueElment(pTable, param, paramLen, jsonVal, bytes);
}else{
getJsonTagValueAll(val, jsonVal, TSDB_MAX_JSON_TAGS_LEN);
}
tVariantCreateFromBinary(tag, jsonVal, bytes, type);
} else {
tVariantCreateFromBinary(tag, val, bytes, type);
}
......@@ -3254,12 +3277,12 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt
SExprInfo* pExprInfo = &pExpr[0];
if (pQueryAttr->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP && pQueryAttr->stableQuery) {
assert(pExprInfo->base.numOfParams == 1);
assert(pExprInfo->base.numOfParams == 2);
int16_t tagColId = (int16_t)pExprInfo->base.param[0].i64;
int16_t tagColId = (int16_t)pExprInfo->base.param[1].i64;
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagColId);
doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes);
GET_JSON_KEY(pExprInfo)
doSetTagValueInParam(pTable, param, paramLen, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes);
return;
} else {
// set tag value, by which the results are aggregated.
......@@ -3275,7 +3298,8 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt
}
// todo use tag column index to optimize performance
doSetTagValueInParam(pTable, pLocalExprInfo->base.colInfo.colId, &pCtx[idx].tag, pLocalExprInfo->base.resType,
GET_JSON_KEY(pLocalExprInfo)
doSetTagValueInParam(pTable, param, paramLen, pLocalExprInfo->base.colInfo.colId, &pCtx[idx].tag, pLocalExprInfo->base.resType,
pLocalExprInfo->base.resBytes);
if (IS_NUMERIC_TYPE(pLocalExprInfo->base.resType)
......@@ -3894,20 +3918,21 @@ void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExpr
if (pQueryAttr->stableQuery && (pRuntimeEnv->pTsBuf != NULL) &&
(pExpr->functionId == TSDB_FUNC_TS || pExpr->functionId == TSDB_FUNC_PRJ) &&
(pExpr->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX)) {
assert(pExpr->numOfParams == 1);
assert(pExpr->numOfParams == 2);
int16_t tagColId = (int16_t)pExprInfo->base.param[0].i64;
int16_t tagColId = (int16_t)pExprInfo->base.param[1].i64;
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagColId);
doSetTagValueInParam(pTable, tagColId, &pCtx->tag, pColInfo->type, pColInfo->bytes);
GET_JSON_KEY(pExprInfo)
doSetTagValueInParam(pTable, param, paramLen, tagColId, &pCtx->tag, pColInfo->type, pColInfo->bytes);
int16_t tagType = pCtx[0].tag.nType;
if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) {
if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR || tagType == TSDB_DATA_TYPE_JSON) {
qDebug("QInfo:0x%"PRIx64" set tag value for join comparison, colId:%" PRId64 ", val:%s", GET_QID(pRuntimeEnv),
pExprInfo->base.param[0].i64, pCtx[0].tag.pz);
pExprInfo->base.param[1].i64, pCtx[0].tag.pz);
} else {
qDebug("QInfo:0x%"PRIx64" set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, GET_QID(pRuntimeEnv),
pExprInfo->base.param[0].i64, pCtx[0].tag.i64);
pExprInfo->base.param[1].i64, pCtx[0].tag.i64);
}
}
}
......@@ -3925,7 +3950,7 @@ int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag,
// failed to find data with the specified tag value and vnodeId
if (!tsBufIsValidElem(&elem)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR || pTag->nType == TSDB_DATA_TYPE_JSON) {
qError("QInfo:0x%"PRIx64" failed to find tag:%s in ts_comp", GET_QID(pRuntimeEnv), pTag->pz);
} else {
qError("QInfo:0x%"PRIx64" failed to find tag:%" PRId64 " in ts_comp", GET_QID(pRuntimeEnv), pTag->i64);
......@@ -3936,7 +3961,7 @@ int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag,
// Keep the cursor info of current table
pTableQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR || pTag->nType == TSDB_DATA_TYPE_JSON) {
qDebug("QInfo:0x%"PRIx64" find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_QID(pRuntimeEnv), pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
} else {
qDebug("QInfo:0x%"PRIx64" find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_QID(pRuntimeEnv), pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
......@@ -3944,7 +3969,7 @@ int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag,
} else {
tsBufSetCursor(pRuntimeEnv->pTsBuf, &pTableQueryInfo->cur);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR || pTag->nType == TSDB_DATA_TYPE_JSON) {
qDebug("QInfo:0x%"PRIx64" find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_QID(pRuntimeEnv), pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
} else {
qDebug("QInfo:0x%"PRIx64" find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_QID(pRuntimeEnv), pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
......@@ -5340,7 +5365,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
pInfo->multiGroupResults = groupResultMixedUp;
pInfo->pMerge = param;
pInfo->bufCapacity = 4096;
pInfo->bufCapacity = 200; // TD-10899
pInfo->udfInfo = pUdfInfo;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
......@@ -7427,7 +7452,16 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
if (pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
data = tsdbGetTableName(item->pTable);
} else {
data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.colInfo.colId, type, bytes);
data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.colInfo.colId, type);
if(type == TSDB_DATA_TYPE_JSON){
if(pExprInfo->base.numOfParams > 0){ // tag-> operation
getJsonTagValueElment(item->pTable, pExprInfo->base.param[0].pz, pExprInfo->base.param[0].nLen, output, bytes);
}else{
getJsonTagValueAll(data, output, bytes);
}
count += 1;
continue;
}
}
doSetTagValueToResultBuf(output, data, type, bytes);
......@@ -7463,13 +7497,20 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
type = pExprInfo[j].base.resType;
bytes = pExprInfo[j].base.resBytes;
dst = pColInfo->pData + count * pExprInfo[j].base.resBytes;
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
data = tsdbGetTableName(item->pTable);
} else {
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type, bytes);
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type);
if(type == TSDB_DATA_TYPE_JSON){
if(pExprInfo[j].base.numOfParams > 0){ // tag-> operation
getJsonTagValueElment(item->pTable, pExprInfo[j].base.param[0].pz, pExprInfo[j].base.param[0].nLen, dst, bytes);
}else{
getJsonTagValueAll(data, dst, bytes);
}
continue;
}
}
dst = pColInfo->pData + count * pExprInfo[j].base.resBytes;
doSetTagValueToResultBuf(dst, data, type, bytes);
}
......@@ -8455,8 +8496,8 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
// it is a user-defined constant value column
assert(pExprs[i].base.functionId == TSDB_FUNC_PRJ);
type = pExprs[i].base.param[1].nType;
bytes = pExprs[i].base.param[1].nLen;
type = pExprs[i].base.param[0].nType;
bytes = pExprs[i].base.param[0].nLen;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
bytes += VARSTR_HEADER_SIZE;
}
......
......@@ -367,6 +367,14 @@ static int32_t tsCompareFunc(TSKEY k1, TSKEY k2, int32_t order) {
}
int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes) {
if (type == TSDB_DATA_TYPE_JSON){
bool canReturn = true;
int32_t result = jsonCompareUnit(f1, f2, &canReturn);
if(canReturn) return result;
type = *f1;
f1 += CHAR_BYTES;
f2 += CHAR_BYTES;
}
switch (type) {
case TSDB_DATA_TYPE_INT: DEFAULT_COMP(GET_INT32_VAL(f1), GET_INT32_VAL(f2));
case TSDB_DATA_TYPE_DOUBLE: DEFAULT_DOUBLE_COMP(GET_DOUBLE_VAL(f1), GET_DOUBLE_VAL(f2));
......
此差异已折叠。
此差异已折叠。
......@@ -267,7 +267,8 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
metaLen += (int32_t)fwrite(&pBlock->tag.nType, 1, sizeof(pBlock->tag.nType), pTSBuf->f);
int32_t trueLen = pBlock->tag.nLen;
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR ||
pBlock->tag.nType == TSDB_DATA_TYPE_JSON) {
metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f);
metaLen += (int32_t)fwrite(pBlock->tag.pz, 1, (size_t)pBlock->tag.nLen, pTSBuf->f);
} else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
......@@ -349,7 +350,8 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
// NOTE: mix types tags are not supported
size_t sz = 0;
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR ||
pBlock->tag.nType == TSDB_DATA_TYPE_JSON) {
char* tp = realloc(pBlock->tag.pz, pBlock->tag.nLen + 1);
assert(tp != NULL);
......
......@@ -23,6 +23,8 @@
#include "tlosertree.h"
#include "queryLog.h"
#include "tscompression.h"
#include "tscUtil.h"
#include "cJSON.h"
typedef struct SCompSupporter {
STableQueryInfo **pTableQueryInfo;
......@@ -587,4 +589,3 @@ void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDi
tfree(outputBuf);
}
}
此差异已折叠。
......@@ -3,9 +3,10 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(tsdb ${SRC})
TARGET_LINK_LIBRARIES(tsdb tfs common tutil)
TARGET_LINK_LIBRARIES(tsdb tfs common tutil cJson)
IF (TD_TSDB_PLUGINS)
TARGET_LINK_LIBRARIES(tsdb tsdbPlugins)
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册